use std::sync::Arc;
use datafusion::{
datasource::{
file_format::file_compression_type::FileCompressionType, physical_plan::FileOpener,
},
error::DataFusionError,
};
use exon_sdf::SDFConfig;
use futures::{StreamExt, TryStreamExt};
use tokio_util::io::StreamReader;
#[derive(Debug)]
pub struct SDFOpener {
file_compression_type: FileCompressionType,
config: Arc<SDFConfig>,
}
impl SDFOpener {
pub fn new(config: Arc<SDFConfig>, file_compression_type: FileCompressionType) -> Self {
Self {
config,
file_compression_type,
}
}
}
impl FileOpener for SDFOpener {
fn open(
&self,
file_meta: datafusion::datasource::physical_plan::FileMeta,
) -> datafusion::error::Result<datafusion::datasource::physical_plan::FileOpenFuture> {
let config = Arc::clone(&self.config);
let file_compression_type = self.file_compression_type;
Ok(Box::pin(async move {
let get_result = config.object_store.get(file_meta.location()).await?;
let stream = Box::pin(get_result.into_stream().map_err(DataFusionError::from));
let stream = file_compression_type.convert_stream(stream)?;
let stream_reader = StreamReader::new(stream);
let batch_reader = exon_sdf::BatchReader::new(stream_reader, config);
let batch_stream = batch_reader.into_stream();
Ok(batch_stream.boxed())
}))
}
}