use std::{ops::Range, sync::Arc};
use datafusion::{
datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener},
error::DataFusionError,
};
use exon_vcf::{IndexedAsyncBatchStream, VCFConfig};
use futures::{StreamExt, TryStreamExt};
use noodles::{
bgzf::{self, VirtualPosition},
core::Region,
};
use object_store::{GetOptions, GetRange};
use tokio_util::io::StreamReader;
use crate::{
datasources::indexed_file::indexed_bgzf_file::BGZFIndexedOffsets, error::ExonError,
streaming_bgzf::AsyncBGZFReader,
};
#[derive(Debug)]
pub struct IndexedVCFOpener {
config: Arc<VCFConfig>,
region: Arc<Region>,
}
impl IndexedVCFOpener {
pub fn new(config: Arc<VCFConfig>, region: Arc<Region>) -> Self {
Self { config, region }
}
}
impl FileOpener for IndexedVCFOpener {
fn open(&self, file_meta: FileMeta) -> datafusion::error::Result<FileOpenFuture> {
tracing::debug!("Opening file: {:?}", file_meta.location());
let config = Arc::clone(&self.config);
let region = Arc::clone(&self.region);
Ok(Box::pin(async move {
let s = config
.object_store
.get(file_meta.location())
.await?
.into_stream();
let stream_reader = Box::pin(s.map_err(DataFusionError::from));
let stream_reader = StreamReader::new(stream_reader);
let first_bgzf_reader = bgzf::AsyncReader::new(stream_reader);
let mut vcf_reader = noodles::vcf::AsyncReader::new(first_bgzf_reader);
let header = vcf_reader.read_header().await?;
let header_offset = vcf_reader.get_ref().virtual_position();
let batch_stream = match file_meta.extensions {
Some(ref ext) => {
let index_offsets = ext.downcast_ref::<BGZFIndexedOffsets>().ok_or(
DataFusionError::Internal(
"Expected index offsets in VCF file extensions".to_string(),
),
)?;
let vp_start = index_offsets.start;
let vp_end = index_offsets.end;
if vp_end.compressed() == 0 {
let stream = config
.object_store
.get(file_meta.location())
.await?
.into_stream()
.map_err(DataFusionError::from);
let stream_reader = StreamReader::new(Box::pin(stream));
let mut async_reader = AsyncBGZFReader::from_reader(stream_reader);
async_reader.scan_to_virtual_position(header_offset).await?;
let bgzf_reader = async_reader.into_inner();
let vcf_reader = noodles::vcf::AsyncReader::new(bgzf_reader);
IndexedAsyncBatchStream::new(vcf_reader, config, Arc::new(header), region)
} else {
let start = vp_start.compressed() as usize;
let end = if vp_start.compressed() == vp_end.compressed() {
file_meta.object_meta.size
} else {
vp_end.compressed() as usize
};
tracing::info!(
"Reading compressed range: {}..{} (uncompressed {}..{}) of {}",
vp_start.compressed(),
vp_end.compressed(),
start,
end,
file_meta.location()
);
let get_options = GetOptions {
range: Some(GetRange::Bounded(Range { start, end })),
..Default::default()
};
let get_response = config
.object_store
.get_opts(file_meta.location(), get_options)
.await?;
let stream = get_response.into_stream().map_err(DataFusionError::from);
let stream_reader = StreamReader::new(Box::pin(stream));
let mut async_reader = AsyncBGZFReader::from_reader(stream_reader);
if vp_start.compressed() == 0 && vp_start.uncompressed() == 0 {
tracing::debug!("Seeking to header offset: {:?}", header_offset);
async_reader.scan_to_virtual_position(header_offset).await?;
}
if vp_start.uncompressed() > 0 {
let marginal_start_vp =
VirtualPosition::try_from((0, vp_start.uncompressed()))
.map_err(ExonError::from)?;
async_reader
.scan_to_virtual_position(marginal_start_vp)
.await?;
}
let bgzf_reader = async_reader.into_inner();
let vcf_reader = noodles::vcf::AsyncReader::new(bgzf_reader);
let mut batch_stream = IndexedAsyncBatchStream::new(
vcf_reader,
config,
Arc::new(header),
region,
);
if vp_start.compressed() == vp_end.compressed() {
batch_stream =
batch_stream.with_max_bytes(vp_end.uncompressed() as usize);
}
batch_stream
}
}
None => {
let get_stream = config
.object_store
.get(file_meta.location())
.await?
.into_stream()
.map_err(DataFusionError::from);
let stream_reader = StreamReader::new(Box::pin(get_stream));
let mut async_reader = AsyncBGZFReader::from_reader(stream_reader);
if vcf_reader.get_ref().virtual_position().compressed() == 0
&& vcf_reader.get_ref().virtual_position().uncompressed() == 0
{
tracing::debug!("Seeking to header offset: {:?}", header_offset);
async_reader.scan_to_virtual_position(header_offset).await?;
}
let bgzf_reader = async_reader.into_inner();
let vcf_reader = noodles::vcf::AsyncReader::new(bgzf_reader);
IndexedAsyncBatchStream::new(vcf_reader, config, Arc::new(header), region)
}
};
let batch_stream = batch_stream.into_stream();
Ok(batch_stream.boxed())
}))
}
}