use std::{any::Any, sync::Arc};
use arrow::datatypes::SchemaRef;
use datafusion::{
common::Statistics,
datasource::physical_plan::{FileScanConfig, FileStream},
physical_plan::{
metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan,
PlanProperties, SendableRecordBatchStream,
},
};
use exon_bcf::BCFConfig;
use noodles::core::Region;
use crate::datasources::ExonFileScanConfig;
use super::file_opener::BCFOpener;
#[derive(Debug)]
pub struct BCFScan {
base_config: FileScanConfig,
projected_schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
region_filter: Option<Region>,
properties: PlanProperties,
statistics: Statistics,
}
impl BCFScan {
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, statistics, properties) = base_config.project_with_properties();
Self {
base_config,
projected_schema,
metrics: ExecutionPlanMetricsSet::new(),
region_filter: None,
properties,
statistics,
}
}
pub fn with_region_filter(mut self, region_filter: Region) -> Self {
self.region_filter = Some(region_filter);
self
}
}
impl DisplayAs for BCFScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let bcf_repr = format!(
"BCFScanExec: files={:?}, partitions={:?}",
self.base_config.file_groups,
self.base_config.file_projection(),
);
write!(f, "{}", bcf_repr)
}
}
impl ExecutionPlan for BCFScan {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"BCFScanExec"
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.projected_schema)
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn statistics(&self) -> datafusion::error::Result<Statistics> {
Ok(self.statistics.clone())
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;
let batch_size = context.session_config().batch_size();
let config = BCFConfig::new(object_store, Arc::clone(&self.base_config.file_schema))
.with_batch_size(batch_size)
.with_some_projection(Some(self.base_config.file_projection()));
let mut opener = BCFOpener::new(Arc::new(config));
if let Some(region) = &self.region_filter {
opener = opener.with_region_filter(region.clone());
}
let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
}