use std::{fmt::Debug, str::FromStr, sync::Arc};
use arrow::datatypes::Field;
use async_trait::async_trait;
use datafusion::{
datasource::{
file_format::file_compression_type::FileCompressionType,
listing::{ListingTableConfig, ListingTableUrl},
physical_plan::FileScanConfig,
},
execution::runtime_env::RuntimeEnv,
physical_plan::ExecutionPlan,
};
use exon_fasta::SequenceDataType;
use noodles::core::Region;
use object_store::{path::Path, ObjectStore};
use crate::{
error::ExonError,
physical_plan::object_store::{parse_url, url_to_object_store_url},
};
#[async_trait]
pub trait ExonListingOptions: Default + Send + Sync + Debug {
fn table_partition_cols(&self) -> &[Field];
fn file_extension(&self) -> &str;
fn file_compression_type(&self) -> FileCompressionType {
FileCompressionType::UNCOMPRESSED
}
async fn create_physical_plan(
&self,
conf: FileScanConfig,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>>;
}
#[async_trait]
pub trait ExonIndexedListingOptions: ExonListingOptions {
fn indexed(&self) -> bool;
fn regions(&self) -> &[Region];
fn coalesce_regions(&self, regions: Vec<Region>) -> Vec<Region> {
let mut all_regions = self.regions().to_vec();
all_regions.extend(regions);
all_regions
}
async fn create_physical_plan_with_regions(
&self,
conf: FileScanConfig,
region: Vec<Region>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>>;
}
#[async_trait]
pub trait ExonFileIndexedListingOptions: ExonIndexedListingOptions {
fn region_file(&self) -> crate::Result<&str>;
async fn get_regions_from_file(
&self,
runtime_env: &Arc<RuntimeEnv>,
) -> crate::Result<Vec<Region>> {
let region_file = self.region_file()?;
let region_url = parse_url(region_file)?;
let object_store_url = url_to_object_store_url(®ion_url)?;
let object_store = runtime_env.object_store(object_store_url)?;
let region_bytes = object_store
.get(&Path::from_url_path(region_url.path())?)
.await?
.bytes()
.await?;
let regions = std::str::from_utf8(®ion_bytes)
.map_err(|e| ExonError::ExecutionError(format!("Error parsing region file: {}", e)))?
.lines()
.map(|line| {
let line = line.trim();
let region = Region::from_str(line).unwrap();
Ok(region)
})
.collect::<crate::Result<Vec<_>>>()?;
Ok(regions)
}
}
#[async_trait]
pub trait ExonSequenceDataTypeOptions {
fn sequence_data_type(&self) -> &SequenceDataType;
fn sequence_buffer_capacity(&self) -> usize;
}
#[derive(Debug, Clone)]
pub struct ExonListingConfig<T> {
pub inner: ListingTableConfig,
pub options: Arc<T>,
}
impl<T> ExonListingConfig<T> {
pub fn new_with_options(table_path: ListingTableUrl, options: T) -> Self {
Self {
inner: ListingTableConfig::new(table_path),
options: Arc::new(options),
}
}
pub fn first_table_path(&self) -> Option<&ListingTableUrl> {
self.inner.table_paths.first()
}
}