use std::{fmt::Debug, sync::Arc};
use crate::{
config::extract_config_from_state,
datasources::{exon_listing_table_options::ExonListingConfig, ScanFunction},
error::ExonError,
ExonRuntimeEnvExt,
};
use datafusion::{
datasource::{function::TableFunctionImpl, listing::ListingTableUrl, TableProvider},
error::{DataFusionError, Result},
execution::context::SessionContext,
logical_expr::Expr,
scalar::ScalarValue,
};
use exon_common::TableSchema;
use super::table_provider::{ListingBAMTable, ListingBAMTableOptions};
#[derive(Default)]
pub struct BAMScanFunction {
ctx: SessionContext,
}
impl Debug for BAMScanFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BAMScanFunction").finish()
}
}
impl BAMScanFunction {
pub fn new(ctx: SessionContext) -> Self {
Self { ctx }
}
}
impl TableFunctionImpl for BAMScanFunction {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let listing_scan_function = ScanFunction::try_from(exprs)?;
futures::executor::block_on(async {
self.ctx
.runtime_env()
.exon_register_object_store_url(listing_scan_function.listing_table_url.as_ref())
.await
})?;
let state = self.ctx.state();
let config = extract_config_from_state(&state)?;
let options = ListingBAMTableOptions::default().with_tag_as_struct(config.bam_parse_tags);
let schema = futures::executor::block_on(async {
let schema = options
.infer_schema(&self.ctx.state(), &listing_scan_function.listing_table_url)
.await?;
Ok::<TableSchema, datafusion::error::DataFusionError>(schema)
})?;
let listing_table_config =
ExonListingConfig::new_with_options(listing_scan_function.listing_table_url, options);
let listing_table = ListingBAMTable::new(listing_table_config, schema);
Ok(Arc::new(listing_table))
}
}
pub struct BAMIndexedScanFunction {
ctx: SessionContext,
}
impl Debug for BAMIndexedScanFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BAMIndexedScanFunction").finish()
}
}
impl BAMIndexedScanFunction {
pub fn new(ctx: SessionContext) -> Self {
Self { ctx }
}
}
impl TableFunctionImpl for BAMIndexedScanFunction {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Literal(ScalarValue::Utf8(Some(path)))) = exprs.first() else {
return Err(DataFusionError::Internal(
"this function requires the path to be specified as the first argument".into(),
));
};
let listing_table_url = ListingTableUrl::parse(path)?;
futures::executor::block_on(async {
self.ctx
.runtime_env()
.exon_register_object_store_url(listing_table_url.as_ref())
.await
})?;
let Some(Expr::Literal(ScalarValue::Utf8(Some(region_str)))) = exprs.get(1) else {
return Err(DataFusionError::Internal(
"this function requires the region to be specified as the second argument".into(),
));
};
let region = region_str.parse().map_err(ExonError::from)?;
let state = self.ctx.state();
let config = extract_config_from_state(&state)?;
let options = ListingBAMTableOptions::default()
.with_regions(vec![region])
.with_tag_as_struct(config.bam_parse_tags);
let schema = futures::executor::block_on(async {
let schema = options
.infer_schema(&self.ctx.state(), &listing_table_url)
.await?;
Ok::<TableSchema, datafusion::error::DataFusionError>(schema)
})?;
let listing_table_config = ExonListingConfig::new_with_options(listing_table_url, options);
let listing_table = ListingBAMTable::new(listing_table_config, schema);
Ok(Arc::new(listing_table))
}
}