use std::sync::Arc;
use datafusion::{
datasource::{
file_format::file_compression_type::FileCompressionType,
physical_plan::{FileMeta, FileOpenFuture, FileOpener},
},
error::DataFusionError,
};
use exon_genbank::{BatchReader, GenbankConfig};
use futures::{StreamExt, TryStreamExt};
pub struct GenbankOpener {
config: Arc<GenbankConfig>,
file_compression_type: FileCompressionType,
}
impl GenbankOpener {
pub fn new(config: Arc<GenbankConfig>, file_compression_type: FileCompressionType) -> Self {
Self {
config,
file_compression_type,
}
}
}
impl FileOpener for GenbankOpener {
fn open(&self, file_meta: FileMeta) -> datafusion::error::Result<FileOpenFuture> {
let config = Arc::clone(&self.config);
let file_compression_type = self.file_compression_type;
Ok(Box::pin(async move {
let get_result = config.object_store.get(file_meta.location()).await?;
let stream_reader = get_result.into_stream().map_err(DataFusionError::from);
let stream_reader = Box::pin(stream_reader);
let new_reader = file_compression_type.convert_stream(stream_reader).unwrap();
let collected = new_reader
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten()
.collect::<Vec<u8>>();
let cursor = std::io::Cursor::new(collected);
let buf_reader = std::io::BufReader::new(cursor);
let gff_batch_reader = BatchReader::new(buf_reader, config).into_stream();
Ok(gff_batch_reader.boxed())
}))
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion::datasource::{
file_format::file_compression_type::FileCompressionType,
physical_plan::{FileMeta, FileOpener},
};
use exon_genbank::GenbankConfig;
use exon_test::test_listing_table_dir;
use futures::StreamExt;
use object_store::{local::LocalFileSystem, ObjectStore};
use crate::datasources::genbank::GenbankOpener;
#[tokio::test]
async fn test_opener() {
let object_store = Arc::new(LocalFileSystem::new());
let config = GenbankConfig::new(Arc::<object_store::local::LocalFileSystem>::clone(
&object_store,
));
let opener = GenbankOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED);
let path = test_listing_table_dir("genbank", "test.gb");
let object_meta = object_store.head(&path).await.unwrap();
let file_meta = FileMeta::from(object_meta);
let mut opened_file = opener.open(file_meta).unwrap().await.unwrap();
let mut n_records = 0;
while let Some(batch) = opened_file.next().await {
let batch = batch.unwrap();
n_records += batch.num_rows();
}
assert_eq!(n_records, 1);
}
}