use std::{any::Any, collections::HashMap, sync::Arc};
use arrow::datatypes::{Field, SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::Session,
common::Statistics,
datasource::{
listing::{ListingTableConfig, ListingTableUrl},
physical_plan::FileScanConfig,
TableProvider,
},
error::{DataFusionError, Result as DataFusionResult},
logical_expr::{Expr, TableProviderFilterPushDown, TableType},
physical_plan::ExecutionPlan,
};
use exon_common::TableSchema;
use exon_cram::ObjectStoreFastaRepositoryAdapter;
use exon_sam::SAMSchemaBuilder;
use futures::{StreamExt, TryStreamExt};
use noodles::{core::Region, sam::Header};
use object_store::{ObjectMeta, ObjectStore};
use tokio_util::io::StreamReader;
use crate::{
datasources::hive_partition::filter_matches_partition_cols,
error::{ExonError, Result as ExonResult},
physical_plan::{
file_scan_config_builder::FileScanConfigBuilder, infer_region,
object_store::pruned_partition_list,
},
};
use super::{
index::augment_file_with_crai_record_chunks, indexed_scanner::IndexedCRAMScan,
scanner::CRAMScan,
};
const CRAM_EXTENSION: &str = "cram";
#[derive(Debug, Clone)]
pub struct ListingCRAMTableConfig {
inner: ListingTableConfig,
options: ListingCRAMTableOptions,
}
impl ListingCRAMTableConfig {
pub fn new(table_path: ListingTableUrl, options: ListingCRAMTableOptions) -> Self {
Self {
inner: ListingTableConfig::new(table_path),
options,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ListingCRAMTableOptions {
table_partition_cols: Vec<Field>,
fasta_reference: Option<String>,
tag_as_struct: bool,
indexed: bool,
region: Option<Region>,
}
impl TryFrom<&HashMap<String, String>> for ListingCRAMTableOptions {
type Error = ExonError;
fn try_from(options: &HashMap<String, String>) -> Result<Self, ExonError> {
let fasta_reference = options.get("format.fasta_reference").map(|s| s.to_string());
let indexed = options
.get("format.indexed")
.map(|s| s == "true")
.unwrap_or(false);
Ok(Self::default()
.with_fasta_reference(fasta_reference)
.with_indexed(indexed))
}
}
impl ListingCRAMTableOptions {
pub fn with_fasta_reference(mut self, fasta_reference: Option<String>) -> Self {
self.fasta_reference = fasta_reference;
self
}
pub fn with_indexed(mut self, indexed: bool) -> Self {
self.indexed = indexed;
self
}
pub fn with_tag_as_struct(mut self, tag_as_struct: bool) -> Self {
self.tag_as_struct = tag_as_struct;
self
}
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn with_region(mut self, region: Option<Region>) -> Self {
self.region = region;
self
}
async fn infer_schema_from_object_meta(
&self,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> ExonResult<TableSchema> {
if objects.is_empty() {
return Err(ExonError::ExecutionError("No objects found".to_string()));
}
if !self.tag_as_struct {
let builder = SAMSchemaBuilder::default()
.with_partition_fields(self.table_partition_cols.clone());
let table_schema = builder.build();
return Ok(table_schema);
}
let get_result = store.get(&objects[0].location).await?;
let stream_reader = Box::pin(get_result.into_stream().map_err(ExonError::from));
let stream_reader = StreamReader::new(stream_reader);
let reference_sequence_repository = match &self.fasta_reference {
Some(reference) => {
let object_store_adapter = ObjectStoreFastaRepositoryAdapter::try_new(
Arc::clone(store),
reference.to_string(),
)
.await?;
noodles::fasta::Repository::new(object_store_adapter)
}
None => noodles::fasta::Repository::default(),
};
let mut cram_reader = noodles::cram::r#async::io::reader::Builder::default()
.set_reference_sequence_repository(reference_sequence_repository)
.build_from_reader(stream_reader);
cram_reader.read_file_definition().await?;
let header = cram_reader.read_file_header().await?;
let header: Header = header
.to_owned()
.parse()
.map_err(|_| DataFusionError::Execution("Unable to parse header".to_string()))?;
let mut schema_builder = SAMSchemaBuilder::default();
if let Some(Ok(record)) = cram_reader.records(&header).next().await {
schema_builder = schema_builder.with_tags_data_type_from_data(record.data())?;
} else {
return Err(ExonError::ExecutionError(
"No records found in CRAM file".to_string(),
));
}
schema_builder = schema_builder.with_partition_fields(self.table_partition_cols.clone());
Ok(schema_builder.build())
}
pub async fn infer_schema<'a>(
&self,
state: &dyn Session,
table_path: &'a ListingTableUrl,
) -> ExonResult<TableSchema> {
let store = state.runtime_env().object_store(table_path)?;
let files = exon_common::object_store_files_from_table_path(
&store,
table_path.as_ref(),
table_path.prefix(),
CRAM_EXTENSION,
None,
)
.await;
let files = files
.try_collect::<Vec<_>>()
.await
.map_err(|e| DataFusionError::Execution(format!("Unable to get path info: {}", e)))?;
self.infer_schema_from_object_meta(&store, &files).await
}
async fn create_physical_plan_with_region(
&self,
conf: FileScanConfig,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let scan = IndexedCRAMScan::new(conf, self.fasta_reference.clone());
Ok(Arc::new(scan))
}
async fn create_physical_plan(
&self,
conf: FileScanConfig,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let scan = CRAMScan::new(conf, self.fasta_reference.clone());
Ok(Arc::new(scan))
}
}
#[derive(Debug, Clone)]
pub struct ListingCRAMTable {
table_paths: Vec<ListingTableUrl>,
table_schema: TableSchema,
options: ListingCRAMTableOptions,
}
impl ListingCRAMTable {
pub fn try_new(config: ListingCRAMTableConfig, table_schema: TableSchema) -> ExonResult<Self> {
Ok(Self {
table_paths: config.inner.table_paths,
table_schema,
options: config.options,
})
}
}
#[async_trait]
impl TableProvider for ListingCRAMTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema.table_schema())
}
fn table_type(&self) -> TableType {
TableType::Base
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
let pushdown = filters
.iter()
.map(|f| {
if let Expr::ScalarFunction(s) = f {
if s.name() == "cram_region_filter" {
return Ok(TableProviderFilterPushDown::Exact);
}
}
let pt = filter_matches_partition_cols(f, &self.options.table_partition_cols);
Ok(pt)
})
.collect::<DataFusionResult<Vec<_>>>()?;
tracing::trace!(
"for filters {:?}, returning pushdown {:?}",
filters,
pushdown,
);
Ok(pushdown)
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let object_store_url = self.table_paths[0].object_store();
let object_store = state.runtime_env().object_store(object_store_url.clone())?;
if !self.options.indexed {
let file_list = pruned_partition_list(
&object_store,
&self.table_paths[0],
filters,
CRAM_EXTENSION,
&self.options.table_partition_cols,
)
.await?
.try_collect::<Vec<_>>()
.await?;
let file_schema = self.table_schema.file_schema()?;
let file_scan_config =
FileScanConfigBuilder::new(object_store_url.clone(), file_schema, vec![file_list])
.projection_option(projection.cloned())
.limit_option(limit)
.table_partition_cols(self.options.table_partition_cols.clone())
.build();
let table = self.options.create_physical_plan(file_scan_config).await?;
return Ok(table);
}
tracing::info!("for indexed CRAM, using filters: {:?}", filters);
let regions = filters
.iter()
.map(|f| {
if let Expr::ScalarFunction(s) = f {
let r = infer_region::infer_region_from_udf(s, "cram_region_filter")?;
Ok(r)
} else {
Ok(None)
}
})
.collect::<ExonResult<Vec<_>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>();
tracing::info!("regions: {:?}", regions);
let regions = if self.options.indexed {
if regions.len() == 1 {
regions
} else {
match self.options.region.clone() {
Some(region) => vec![region],
None => regions,
}
}
} else {
regions
};
if regions.is_empty() && self.options.indexed {
return Err(DataFusionError::Plan(
"An indexed CRAM table type requires a region filter. See the 'cram_region_filter' function.".to_string(),
));
}
if regions.len() > 1 {
return Err(DataFusionError::Plan(
"Only one region filter is supported".to_string(),
));
}
let mut file_list = pruned_partition_list(
&object_store,
&self.table_paths[0],
filters,
CRAM_EXTENSION,
&self.options.table_partition_cols,
)
.await?;
let mut file_partition_with_ranges = Vec::new();
let region = regions[0].clone();
while let Some(f) = file_list.next().await {
let f = f?;
let s = object_store.get(&f.object_meta.location).await?;
let s = s.into_stream().map_err(DataFusionError::from);
let stream_reader = Box::pin(s);
let stream_reader = StreamReader::new(stream_reader);
let mut cram_reader = noodles::cram::AsyncReader::new(stream_reader);
cram_reader.read_file_definition().await?;
let header = cram_reader.read_file_header().await?;
let header: Header = header.parse().map_err(|_| {
DataFusionError::Execution("Failed to parse CRAM header".to_string())
})?;
let file_byte_range = augment_file_with_crai_record_chunks(
Arc::clone(&object_store),
&header,
&f,
®ion,
)
.await?;
file_partition_with_ranges.extend(file_byte_range);
}
let file_scan_config = FileScanConfig {
object_store_url: object_store_url.clone(),
file_schema: self.table_schema.file_schema()?,
file_groups: vec![file_partition_with_ranges],
statistics: Statistics::new_unknown(self.table_schema.file_schema()?.as_ref()),
projection: projection.cloned(),
limit,
output_ordering: Vec::new(),
table_partition_cols: self.options.table_partition_cols.clone(),
};
let table = self
.options
.create_physical_plan_with_region(file_scan_config)
.await?;
return Ok(table);
}
}