use std::sync::Arc;
use datafusion::{
datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener as FileOpenerTrait},
error::DataFusionError,
};
use futures::StreamExt;
use object_store::GetResultPayload;
pub struct FileOpener {
config: Arc<exon_bigwig::zoom_batch_reader::BigWigZoomConfig>,
}
impl FileOpener {
pub fn new(config: Arc<exon_bigwig::zoom_batch_reader::BigWigZoomConfig>) -> Self {
Self { config }
}
}
impl FileOpenerTrait for FileOpener {
fn open(&self, file_meta: FileMeta) -> datafusion::error::Result<FileOpenFuture> {
let config = Arc::clone(&self.config);
Ok(Box::pin(async move {
let get_result = config.object_store.get(file_meta.location()).await?;
match get_result.payload {
GetResultPayload::File(_, path_buf) => {
let batch_reader =
exon_bigwig::zoom_batch_reader::ZoomRecordBatchReader::try_new(
&path_buf.display().to_string(),
Arc::clone(&config),
)?;
let batch_stream = batch_reader.into_stream();
Ok(batch_stream.boxed())
}
_ => Err(DataFusionError::Execution(
"BigWig file opener expected a file".to_string(),
)),
}
}))
}
}