use std::{any::Any, sync::Arc};
use crate::{
datasources::{hive_partition::filter_matches_partition_cols, ExonFileType},
error::Result,
physical_plan::{
file_scan_config_builder::FileScanConfigBuilder, infer_region,
object_store::pruned_partition_list,
},
};
use arrow::datatypes::{Field, SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::Session,
datasource::{
file_format::file_compression_type::FileCompressionType,
listing::{ListingTableConfig as DataFusionListingTableConfig, ListingTableUrl},
physical_plan::FileScanConfig,
TableProvider,
},
error::Result as DataFusionResult,
logical_expr::{TableProviderFilterPushDown, TableType},
physical_plan::ExecutionPlan,
prelude::Expr,
};
use exon_bigwig::zoom_batch_reader::SchemaBuilder;
use exon_common::TableSchema;
use futures::TryStreamExt;
use noodles::core::Region;
use super::scanner::Scanner;
#[derive(Debug, Clone)]
pub struct ListingTableConfig {
inner: DataFusionListingTableConfig,
options: ListingTableOptions,
}
impl ListingTableConfig {
pub fn new(table_path: ListingTableUrl, options: ListingTableOptions) -> Self {
Self {
inner: DataFusionListingTableConfig::new(table_path),
options,
}
}
}
#[derive(Debug, Clone)]
pub struct ListingTableOptions {
file_extension: String,
table_partition_cols: Vec<Field>,
reduction_level: u32,
region: Option<Region>,
}
impl ListingTableOptions {
pub fn new(reduction_level: u32) -> Self {
let file_extension =
ExonFileType::BigWigZoom.get_file_extension(FileCompressionType::UNCOMPRESSED);
Self {
file_extension,
table_partition_cols: Vec::new(),
reduction_level,
region: None,
}
}
pub fn with_region(self, region: Region) -> Self {
Self {
region: Some(region),
..self
}
}
pub fn with_table_partition_cols(self, table_partition_cols: Vec<Field>) -> Self {
Self {
table_partition_cols,
..self
}
}
pub fn infer_schema(&self) -> datafusion::error::Result<TableSchema> {
let mut schema_builder = SchemaBuilder::default();
schema_builder.add_partition_fields(self.table_partition_cols.clone());
Ok(schema_builder.build())
}
async fn create_physical_plan(
&self,
conf: FileScanConfig,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let scan = Scanner::new(conf.clone(), self.reduction_level);
Ok(Arc::new(scan))
}
async fn create_physical_plan_with_region(
&self,
conf: FileScanConfig,
region: Region,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let scan =
Scanner::new(conf.clone(), self.reduction_level).with_some_region_filter(Some(region));
Ok(Arc::new(scan))
}
}
#[derive(Debug, Clone)]
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
table_schema: TableSchema,
options: ListingTableOptions,
}
impl ListingTable {
pub fn try_new(config: ListingTableConfig, table_schema: TableSchema) -> Result<Self> {
Ok(Self {
table_paths: config.inner.table_paths,
table_schema,
options: config.options,
})
}
}
#[async_trait]
impl TableProvider for ListingTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema.table_schema())
}
fn table_type(&self) -> TableType {
TableType::Base
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(filters
.iter()
.map(|f| match f {
Expr::ScalarFunction(scalar) => {
if scalar.name() == "bigwig_region_filter" {
TableProviderFilterPushDown::Exact
} else {
filter_matches_partition_cols(f, &self.options.table_partition_cols)
}
}
_ => filter_matches_partition_cols(f, &self.options.table_partition_cols),
})
.collect())
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let object_store_url = if let Some(url) = self.table_paths.first() {
url.object_store()
} else {
todo!()
};
let object_store = state.runtime_env().object_store(object_store_url.clone())?;
let file_list = pruned_partition_list(
&object_store,
&self.table_paths[0],
filters,
self.options.file_extension.as_str(),
&self.options.table_partition_cols,
)
.await?
.try_collect::<Vec<_>>()
.await?;
let file_schema = self.table_schema.file_schema()?;
let file_scan_config =
FileScanConfigBuilder::new(object_store_url.clone(), file_schema, vec![file_list])
.projection_option(projection.cloned())
.limit_option(limit)
.table_partition_cols(self.options.table_partition_cols.clone())
.build();
let regions = filters
.iter()
.map(|f| {
if let Expr::ScalarFunction(s) = f {
let r = infer_region::infer_region_from_udf(s, "bigwig_region_filter")?;
Ok(r)
} else {
Ok(None)
}
})
.collect::<crate::Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>();
if regions.is_empty() {
if let Some(region) = self.options.region.clone() {
tracing::info!("Creating physical plan with region: {:?}", region);
let plan = self
.options
.create_physical_plan_with_region(file_scan_config, region)
.await?;
Ok(plan)
} else {
tracing::info!("Creating physical plan without region");
let plan = self.options.create_physical_plan(file_scan_config).await?;
Ok(plan)
}
} else if regions.len() == 1 {
tracing::info!(
"Creating physical plan with region: {:?}",
regions.first().unwrap()
);
let plan = self
.options
.create_physical_plan_with_region(
file_scan_config.clone(),
regions.first().unwrap().clone(),
)
.await?;
Ok(plan)
} else {
let mut plans = Vec::new();
for region in regions {
let plan = self
.options
.create_physical_plan_with_region(file_scan_config.clone(), region)
.await?;
plans.push(plan);
}
let plan = datafusion::physical_plan::union::UnionExec::new(plans);
Ok(Arc::new(plan))
}
}
}