use std::sync::Arc;
use arrow::error::ArrowError;
use datafusion::{
datasource::{
file_format::file_compression_type::FileCompressionType,
physical_plan::{FileMeta, FileOpenFuture, FileOpener},
},
error::DataFusionError,
};
use exon_fastq::{BatchReader, FASTQConfig};
use futures::{StreamExt, TryStreamExt};
use tokio::io::AsyncBufReadExt;
use tokio_util::io::StreamReader;
use crate::streaming_bgzf::is_bgzip_valid_header;
pub struct FASTQOpener {
config: Arc<FASTQConfig>,
file_compression_type: FileCompressionType,
}
impl FASTQOpener {
pub fn new(config: Arc<FASTQConfig>, file_compression_type: FileCompressionType) -> Self {
Self {
config,
file_compression_type,
}
}
}
impl FileOpener for FASTQOpener {
fn open(&self, file_meta: FileMeta) -> datafusion::error::Result<FileOpenFuture> {
let config = Arc::clone(&self.config);
let file_compression_type = self.file_compression_type;
Ok(Box::pin(async move {
match file_compression_type {
FileCompressionType::GZIP => {
let get_result = config.object_store.get(file_meta.location()).await?;
let stream = Box::pin(get_result.into_stream().map_err(DataFusionError::from));
let mut stream_reader = StreamReader::new(stream);
let buf = stream_reader.fill_buf().await?;
if is_bgzip_valid_header(buf) {
let get_result = config.object_store.get(file_meta.location()).await?;
let stream =
Box::pin(get_result.into_stream().map_err(DataFusionError::from));
let stream_reader = StreamReader::new(stream);
let bgzf_reader = noodles::bgzf::AsyncReader::new(stream_reader);
let batch_reader = BatchReader::new(bgzf_reader, config);
let batch_stream = batch_reader.into_stream().map_err(ArrowError::from);
Ok(batch_stream.boxed())
} else {
let get_result = config.object_store.get(file_meta.location()).await?;
let stream =
Box::pin(get_result.into_stream().map_err(DataFusionError::from));
let new_reader = file_compression_type.convert_stream(stream)?;
let buf_reader = StreamReader::new(new_reader);
let batch_reader = BatchReader::new(buf_reader, config);
let batch_stream = batch_reader.into_stream().map_err(ArrowError::from);
Ok(batch_stream.boxed())
}
}
_ => {
let get_result = config.object_store.get(file_meta.location()).await?;
let stream = Box::pin(get_result.into_stream().map_err(DataFusionError::from));
let new_reader = file_compression_type.convert_stream(stream)?;
let buf_reader = StreamReader::new(new_reader);
let batch_reader = BatchReader::new(buf_reader, config);
let batch_stream = batch_reader.into_stream().map_err(ArrowError::from);
Ok(batch_stream.boxed())
}
}
}))
}
}
#[cfg(test)]
mod test {}