use std::sync::Arc;
use datafusion::{
datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener},
error::DataFusionError,
};
use exon_sam::{BatchReader, SAMConfig};
use futures::{StreamExt, TryStreamExt};
use tokio_util::io::StreamReader;
pub struct SAMOpener {
config: Arc<SAMConfig>,
}
impl SAMOpener {
pub fn new(config: Arc<SAMConfig>) -> Self {
Self { config }
}
}
impl FileOpener for SAMOpener {
fn open(&self, file_meta: FileMeta) -> datafusion::error::Result<FileOpenFuture> {
let config = Arc::clone(&self.config);
Ok(Box::pin(async move {
let get_result = config.object_store.get(file_meta.location()).await?;
let stream_reader = Box::pin(get_result.into_stream().map_err(DataFusionError::from));
let stream_reader = StreamReader::new(stream_reader);
let batch_reader = BatchReader::new(stream_reader, config).await?;
let batch_stream = batch_reader.into_stream();
Ok(batch_stream.boxed())
}))
}
}