use std::sync::Arc;
use arrow::error::ArrowError;
use datafusion::{
datasource::{
file_format::file_compression_type::FileCompressionType,
physical_plan::{FileMeta, FileOpenFuture, FileOpener},
},
error::DataFusionError,
};
use exon_fasta::{BatchReader, FASTAConfig};
use futures::{StreamExt, TryStreamExt};
use tokio_util::io::StreamReader;
pub struct FASTAOpener {
config: Arc<FASTAConfig>,
file_compression_type: FileCompressionType,
}
impl FASTAOpener {
pub fn new(config: Arc<FASTAConfig>, file_compression_type: FileCompressionType) -> Self {
Self {
config,
file_compression_type,
}
}
}
impl FileOpener for FASTAOpener {
fn open(&self, file_meta: FileMeta) -> datafusion::error::Result<FileOpenFuture> {
let fasta_config = Arc::clone(&self.config);
let file_compression_type = self.file_compression_type;
Ok(Box::pin(async move {
let get_result = fasta_config.object_store.get(file_meta.location()).await?;
let stream_reader = Box::pin(get_result.into_stream().map_err(DataFusionError::from));
let new_reader = match file_compression_type.convert_stream(stream_reader) {
Ok(reader) => reader,
Err(e) => return Err(e),
};
let stream_reader = StreamReader::new(new_reader);
let fasta_batch_reader = BatchReader::new(stream_reader, fasta_config)
.into_stream()
.map_err(ArrowError::from);
Ok(fasta_batch_reader.boxed())
}))
}
}