use std::sync::Arc;
use datafusion::{
datasource::physical_plan::{FileMeta, FileOpener},
error::DataFusionError,
};
use exon_cram::{CRAMConfig, IndexedAsyncBatchStream};
use futures::StreamExt;
use object_store::buffered;
use crate::datasources::cram::index::CRAMIndexData;
pub(super) struct IndexedCRAMOpener {
config: Arc<CRAMConfig>,
}
impl IndexedCRAMOpener {
pub fn new(config: Arc<CRAMConfig>) -> Self {
Self { config }
}
}
impl FileOpener for IndexedCRAMOpener {
fn open(
&self,
file_meta: datafusion::datasource::physical_plan::FileMeta,
) -> datafusion::error::Result<datafusion::datasource::physical_plan::FileOpenFuture> {
let config = Arc::clone(&self.config);
Ok(Box::pin(async move {
let FileMeta {
extensions,
object_meta,
range: _,
} = file_meta;
let index_record = extensions
.as_ref()
.and_then(|ext| ext.downcast_ref::<CRAMIndexData>())
.ok_or(DataFusionError::Execution(
"Invalid index offsets".to_string(),
))?;
tracing::info!(
offset = index_record.offset,
path = ?object_meta.location,
"Reading CRAM file with offset and landmark",
);
let buf_reader =
buffered::BufReader::new(Arc::clone(&config.object_store), &object_meta);
let mut cram_reader = noodles::cram::AsyncReader::new(buf_reader);
cram_reader
.seek(std::io::SeekFrom::Start(index_record.offset))
.await?;
let batch_stream = IndexedAsyncBatchStream::try_new(
cram_reader,
index_record.header.clone(),
config,
index_record.records.clone(),
)
.await?
.into_stream();
Ok(batch_stream.boxed())
}))
}
}