use std::{any::Any, sync::Arc};
use arrow::datatypes::SchemaRef;
use datafusion::{
common::Statistics,
datasource::physical_plan::{FileScanConfig, FileStream},
physical_plan::{
metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan,
PlanProperties, SendableRecordBatchStream,
},
};
use exon_bigwig::zoom_batch_reader::BigWigZoomConfig;
use noodles::core::Region;
use crate::datasources::ExonFileScanConfig;
use super::file_opener::FileOpener;
#[derive(Debug)]
pub struct Scanner {
base_config: FileScanConfig,
projected_schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
region_filter: Option<Region>,
reduction_level: u32,
properties: PlanProperties,
statistics: Statistics,
}
impl Scanner {
pub fn new(base_config: FileScanConfig, reduction_level: u32) -> Self {
let (projected_schema, statistics, properties) = base_config.project_with_properties();
Self {
base_config,
projected_schema,
metrics: ExecutionPlanMetricsSet::new(),
region_filter: None,
reduction_level,
properties,
statistics,
}
}
pub fn with_some_region_filter(mut self, region: Option<Region>) -> Self {
self.region_filter = region;
self
}
}
impl DisplayAs for Scanner {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "BigWigScanner ")?;
self.base_config.fmt_as(t, f)
}
}
impl ExecutionPlan for Scanner {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"BigWigScanner"
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.projected_schema)
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn statistics(&self) -> datafusion::error::Result<Statistics> {
Ok(self.statistics.clone())
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;
let batch_size = context.session_config().batch_size();
let config = BigWigZoomConfig::new_with_schema(
object_store,
Arc::clone(&self.base_config.file_schema),
)
.with_batch_size(batch_size)
.with_reduction_level(self.reduction_level)
.with_some_interval(self.region_filter.clone())
.with_some_projection(Some(self.base_config.file_projection()));
let opener = FileOpener::new(Arc::new(config));
let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
}