use std::{path::PathBuf, time::Duration};
use anyhow::bail;
use itertools::Itertools;
use rust_htslib::{
bam,
bcf::{self, Read},
};
use tracing::{debug, instrument};
use url::Url;
use crate::{
RunnableCommand,
common::{
aligner::RUNNING,
list_of_regions::{Regions, RegionsDefinition},
reference::ReferenceReader,
},
vcf::{
cli::{Command, OutputOptions},
pipeline::{
reader::VCFReader,
writer::{OutputWriter, region_writer::RegionWriter},
},
},
};
pub mod cli;
pub mod pipeline;
pub mod strings;
impl RunnableCommand for Command {
#[instrument(name = "vcf", skip_all)]
async fn run(self) -> anyhow::Result<()> {
tokio::spawn(async {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
let r = RUNNING.load(std::sync::atomic::Ordering::Relaxed);
debug!("Running tasks: {r}");
}
});
self.validate()?;
let regions = self.regions.read_regions().await?;
let input = tokio::task::block_in_place(move || open_input_file(&self.input, regions))?;
let mut header = bcf::Header::from_template(input.header());
augment_header(&mut header, self.no_version);
let output = tokio::task::block_in_place(move || open_output_file(&self.output, &header))?;
let reference =
tokio::task::block_in_place(move || ReferenceReader::try_from(&self.reference))?;
let targets = self.targets.read_regions().await?;
let region_output = RegionWriter::from_parameter(self.ts_regions.as_deref()).await?;
let bam_f = tokio::task::block_in_place(move || open_bam_file(self.bam_f.as_deref()));
pipeline::run(
input,
output,
reference,
targets,
region_output,
self.pipeline_settings,
bam_f,
)
.await?;
Ok(())
}
}
impl Command {
fn validate(&self) -> anyhow::Result<()> {
if self.output.output_file.as_ref().is_none_or(|o| o == "-")
&& self.ts_regions.as_ref().is_some_and(|o| o == "-")
{
bail!("Cannot output both vcf and regions output on stdout.");
}
Ok(())
}
}
fn augment_header(header: &mut bcf::Header, no_version: bool) {
#[allow(clippy::wildcard_imports)]
use strings::*;
header.push_record(
format!(
"##INFO=<ID={VCF_TS_CIGARETS_KEY},Number=1,Type=String,Description=\"{VCF_TS_CIGARETS_DESC}\">"
)
.as_bytes(),
);
if !no_version {
header.push_record(
format!("##{VCF_HEADER_TWITCHER_VERSION_KEY}={TWITCHER_VERSION}",).as_bytes(),
);
header.push_record(
format!("##{VCF_HEADER_TWITCHER_CMD}={}", std::env::args().join(" ")).as_bytes(),
);
}
}
fn open_bam_file(bam_f: Option<&str>) -> Option<bam::IndexedReader> {
bam_f.map(|path| bam::IndexedReader::from_path(path).unwrap())
}
fn open_input_file(input: &str, regions: Option<Regions>) -> anyhow::Result<VCFReader> {
if let Some(regions) = regions {
let inner = match input {
"-" => bail!("Cannot create indexed reader from stdin"),
path_or_url => {
if let Ok(url) = Url::parse(path_or_url) {
bcf::IndexedReader::from_url(&url)
} else {
bcf::IndexedReader::from_path(path_or_url)
}
}
}?;
let regions =
regions.into_linear(|chr| inner.header().name2rid(chr.as_ref()).unwrap() as usize);
Ok(VCFReader::with_regions(inner, regions))
} else {
let inner = match input {
"-" => bcf::Reader::from_stdin(),
path_or_url => {
if let Ok(url) = Url::parse(path_or_url) {
bcf::Reader::from_url(&url)
} else {
bcf::Reader::from_path(path_or_url)
}
}
}?;
Ok(VCFReader::entire_file(inner))
}
}
fn open_output_file(output: &OutputOptions, header: &bcf::Header) -> anyhow::Result<OutputWriter> {
let inner = match output.output_file.as_deref() {
None | Some("-") => bcf::Writer::from_stdout(header, true, bcf::Format::Vcf),
Some(path_or_url) => {
let url = Url::parse(path_or_url);
let path = PathBuf::from(match &url {
Ok(url) => url.path(),
_ => path_or_url,
});
let compressed = matches!(
path.extension().and_then(|e| e.to_str()),
Some("gz" | "bgz")
);
let format = if path
.file_name()
.is_none_or(|n| n.to_str().is_some_and(|s| s.contains(".vcf")))
{
bcf::Format::Vcf
} else {
bcf::Format::Bcf
};
match url {
Ok(url) => bcf::Writer::from_url(&url, header, !compressed, format),
Err(_) => bcf::Writer::from_path(path, header, !compressed, format),
}
}
}?;
let writer = if let Some(plus_minus) = output.output_filter.realigned_and_context {
OutputWriter::new_buffered(inner, i64::try_from(plus_minus)?)
} else {
OutputWriter::new_native(inner, output.output_filter.only_realigned)
};
Ok(writer)
}
#[cfg(test)]
mod tests {
use std::io::Read as _;
use rust_htslib::{
bcf::{Header, Read},
bgzf,
};
use crate::common::reference::CliReferenceArg;
use super::*;
#[test]
fn validate_rejects_both_outputs_on_stdout() {
let cmd = Command {
output: None::<&str>.into(), ts_regions: Some("-".into()),
..Default::default()
};
assert!(cmd.validate().is_err());
}
#[test]
fn validate_allows_vcf_on_file_regions_on_stdout() {
let cmd = Command {
output: Some("/tmp/out.vcf").into(),
ts_regions: Some("-".into()),
..Default::default()
};
assert!(cmd.validate().is_ok());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_pipeline() {
let dir = tempfile::tempdir().unwrap();
let outpath = format!("{}/out.vcf", dir.path().to_str().unwrap());
let cmd = Command {
input: "./test_files/test.vcf".to_string(),
reference: CliReferenceArg::from("./test_files/test.fa"),
output: Some(outpath.clone()).into(),
..Default::default()
};
cmd.run().await.unwrap();
let output = std::fs::read_to_string(outpath).unwrap();
let out_records = output.lines().filter(|l| !l.starts_with('#'));
assert_eq!(out_records.count(), 2);
}
#[test]
fn test_auto_detect_output_format_vcf() {
let reader = open_input_file("./test_files/test.vcf", None).unwrap();
let header = Header::from_template(reader.header());
let dir = tempfile::tempdir().unwrap();
let outpath_vcf = format!("{}/out.vcf", dir.path().to_str().unwrap());
let writer = open_output_file(&Some(&outpath_vcf).into(), &header).unwrap();
std::mem::drop(writer);
let written_file = std::fs::read_to_string(outpath_vcf).unwrap();
assert!(written_file.starts_with("##fileformat=VCFv4"));
}
#[test]
fn test_auto_detect_output_format_vcf_gz() {
let reader = open_input_file("./test_files/test.vcf", None).unwrap();
let header = Header::from_template(reader.header());
let dir = tempfile::tempdir().unwrap();
let outpath_vcf_gz = format!("{}/out.vcf.gz", dir.path().to_str().unwrap());
let writer = open_output_file(&Some(&outpath_vcf_gz).into(), &header).unwrap();
std::mem::drop(writer);
let mut written_file = Vec::new();
bgzf::Reader::from_path(outpath_vcf_gz)
.unwrap()
.read_to_end(&mut written_file)
.unwrap();
assert!(written_file.starts_with(b"##fileformat=VCFv4"));
}
#[test]
fn test_auto_detect_output_format_bcf() {
let reader = open_input_file("./test_files/test.vcf", None).unwrap();
let header = Header::from_template(reader.header());
let dir = tempfile::tempdir().unwrap();
let outpath_bcf = format!("{}/out.bcf", dir.path().to_str().unwrap());
let writer = open_output_file(&Some(&outpath_bcf).into(), &header).unwrap();
std::mem::drop(writer);
let mut written_file = Vec::new();
bgzf::Reader::from_path(outpath_bcf)
.unwrap()
.read_to_end(&mut written_file)
.unwrap();
assert!(written_file.starts_with(b"BCF"));
}
#[test]
fn test_auto_detect_output_format_bcf_gz() {
let reader = open_input_file("./test_files/test.vcf", None).unwrap();
let header = Header::from_template(reader.header());
let dir = tempfile::tempdir().unwrap();
let outpath_bcf_gz = format!("{}/out.bcf.gz", dir.path().to_str().unwrap());
let writer = open_output_file(&Some(&outpath_bcf_gz).into(), &header).unwrap();
std::mem::drop(writer);
let mut written_file = Vec::new();
bgzf::Reader::from_path(outpath_bcf_gz)
.unwrap()
.read_to_end(&mut written_file)
.unwrap();
assert!(written_file.starts_with(b"BCF"));
}
}