use std::path::Path;
use futures::StreamExt;
use futures::stream::BoxStream;
use noodles::{bcf, vcf, vcf::Header, vcf::variant::RecordBuf};
use tokio::io::{AsyncBufRead, AsyncWrite, AsyncWriteExt};
use crate::annotate::seqvars::{AnnotatedVariant, AsyncAnnotatedVariantWriter};
use crate::common::io::tokio::{open_read_maybe_bgzf, open_write_maybe_bgzf};
pub type AsyncVcfReader = vcf::r#async::io::Reader<std::pin::Pin<Box<dyn AsyncBufRead + Send>>>;
pub type AsyncBcfReader = bcf::r#async::io::Reader<std::pin::Pin<Box<dyn AsyncBufRead + Send>>>;
pub type AsyncVcfWriter = vcf::r#async::io::Writer<std::pin::Pin<Box<dyn AsyncWrite + Send>>>;
pub type AsyncBcfWriter = bcf::r#async::io::Writer<std::pin::Pin<Box<dyn AsyncWrite + Send>>>;
pub async fn open_vcf_reader(path: impl AsRef<Path>) -> anyhow::Result<AsyncVcfReader> {
let stream = open_read_maybe_bgzf(path).await?;
Ok(vcf::r#async::io::Reader::new(stream))
}
pub async fn open_bcf_reader(path: impl AsRef<Path>) -> anyhow::Result<AsyncBcfReader> {
let stream = open_read_maybe_bgzf(path).await?;
Ok(AsyncBcfReader::from(stream))
}
pub async fn open_vcf_writer(path: impl AsRef<Path>) -> anyhow::Result<AsyncVcfWriter> {
let stream = open_write_maybe_bgzf(path).await?;
Ok(vcf::r#async::io::Writer::new(stream))
}
pub async fn open_bcf_writer(path: impl AsRef<Path>) -> anyhow::Result<AsyncBcfWriter> {
let stream = open_write_maybe_bgzf(path).await?;
Ok(AsyncBcfWriter::from(stream))
}
pub enum VariantReader {
Vcf(AsyncVcfReader),
Bcf(AsyncBcfReader),
}
pub trait NoodlesVariantReader {
#[allow(async_fn_in_trait)]
async fn read_header(&mut self) -> anyhow::Result<Header>;
#[allow(async_fn_in_trait)]
async fn records<'a>(
&'a mut self,
header: &'a Header,
) -> BoxStream<'a, anyhow::Result<RecordBuf>>;
}
impl NoodlesVariantReader for VariantReader {
async fn read_header(&mut self) -> anyhow::Result<Header> {
match self {
VariantReader::Vcf(r) => r.read_header().await.map_err(Into::into),
VariantReader::Bcf(r) => r.read_header().await.map_err(Into::into),
}
}
async fn records<'a>(
&'a mut self,
header: &'a Header,
) -> BoxStream<'a, anyhow::Result<RecordBuf>> {
match self {
VariantReader::Vcf(r) => r
.record_bufs(header)
.map(|res| res.map_err(anyhow::Error::from))
.boxed(),
VariantReader::Bcf(r) => r
.records()
.map(move |res| {
res.map_err(anyhow::Error::from).and_then(|rec| {
RecordBuf::try_from_variant_record(header, &rec)
.map_err(anyhow::Error::from)
})
})
.boxed(),
}
}
}
pub async fn open_variant_reader(path: impl AsRef<Path>) -> anyhow::Result<VariantReader> {
match path.as_ref().extension().and_then(|s| s.to_str()) {
Some("bcf") => open_bcf_reader(path).await.map(VariantReader::Bcf),
_ => open_vcf_reader(path).await.map(VariantReader::Vcf),
}
}
pub enum VariantWriter {
Vcf(AsyncVcfWriter),
Bcf(AsyncBcfWriter),
}
pub async fn open_variant_writer(path: impl AsRef<Path>) -> anyhow::Result<VariantWriter> {
match path.as_ref().extension().and_then(|s| s.to_str()) {
Some("bcf") => open_bcf_writer(path).await.map(VariantWriter::Bcf),
_ => open_vcf_writer(path).await.map(VariantWriter::Vcf),
}
}
impl AsyncAnnotatedVariantWriter for VariantWriter {
async fn write_noodles_header(&mut self, header: &Header) -> Result<(), anyhow::Error> {
match self {
VariantWriter::Vcf(w) => w.write_header(header).await.map_err(Into::into),
VariantWriter::Bcf(w) => w.write_header(header).await.map_err(Into::into),
}
}
async fn write_annotated_record(
&mut self,
header: &Header,
record: AnnotatedVariant,
) -> Result<(), anyhow::Error> {
match self {
VariantWriter::Vcf(w) => w.write_annotated_record(header, record).await,
VariantWriter::Bcf(w) => w.write_annotated_record(header, record).await,
}
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
match self {
VariantWriter::Vcf(w) => w.get_mut().shutdown().await.map_err(Into::into),
VariantWriter::Bcf(w) => w.get_mut().shutdown().await.map_err(Into::into),
}
}
}