use std::sync::Arc;
use datafusion::{
datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener},
error::DataFusionError,
};
use exon_bam::{BAMConfig, BatchReader};
use futures::{StreamExt, TryStreamExt};
use tokio_util::io::StreamReader;
pub struct BAMOpener {
config: Arc<BAMConfig>,
}
impl BAMOpener {
pub fn new(config: Arc<BAMConfig>) -> Self {
Self { config }
}
}
impl FileOpener for BAMOpener {
fn open(&self, file_meta: FileMeta) -> datafusion::error::Result<FileOpenFuture> {
let config = Arc::clone(&self.config);
Ok(Box::pin(async move {
let get_request = config.object_store.get(file_meta.location()).await?;
let get_stream = get_request.into_stream();
let stream_reader = Box::pin(get_stream.map_err(DataFusionError::from));
let stream_reader = StreamReader::new(stream_reader);
let reader = BatchReader::new(stream_reader, config).await?;
Ok(reader.into_stream().boxed())
}))
}
}