use std::sync::Arc;
use datafusion::{
datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener},
error::DataFusionError,
};
use exon_cram::{AsyncBatchStream, CRAMConfig};
use futures::{StreamExt, TryStreamExt};
use noodles::sam::Header;
use tokio_util::io::StreamReader;
#[derive(Debug)]
pub struct CRAMOpener {
config: Arc<CRAMConfig>,
}
impl CRAMOpener {
pub fn new(config: Arc<CRAMConfig>) -> Self {
Self { config }
}
}
impl FileOpener for CRAMOpener {
fn open(&self, file_meta: FileMeta) -> datafusion::error::Result<FileOpenFuture> {
let config = Arc::clone(&self.config);
Ok(Box::pin(async move {
let s = config.object_store.get(file_meta.location()).await?;
let s = s.into_stream().map_err(DataFusionError::from);
let stream_reader = Box::pin(s);
let stream_reader = StreamReader::new(stream_reader);
let mut cram_reader = noodles::cram::AsyncReader::new(stream_reader);
cram_reader.read_file_definition().await?;
let header = cram_reader.read_file_header().await?;
let header: Header = header.parse().map_err(|_| {
DataFusionError::Execution("Failed to parse CRAM header".to_string())
})?;
let batch_stream = AsyncBatchStream::try_new(
cram_reader,
Arc::clone(&config.object_store),
header,
config,
)
.await?
.into_stream();
Ok(batch_stream.boxed())
}))
}
}