use std::any::Any;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPropagation,
};
pub trait DataSource: Send + Sync + Debug {
fn open(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
fn repartitioned(
&self,
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}
fn output_partitioning(&self) -> Partitioning;
fn eq_properties(&self) -> EquivalenceProperties;
fn statistics(&self) -> Result<Statistics>;
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
fn metrics(&self) -> ExecutionPlanMetricsSet {
ExecutionPlanMetricsSet::new()
}
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
Ok(FilterPushdownPropagation::unsupported(filters))
}
}
#[derive(Clone, Debug)]
pub struct DataSourceExec {
data_source: Arc<dyn DataSource>,
cache: PlanProperties,
}
impl DisplayAs for DataSourceExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "DataSourceExec: ")?;
}
DisplayFormatType::TreeRender => {}
}
self.data_source.fmt_as(t, f)
}
}
impl ExecutionPlan for DataSourceExec {
fn name(&self) -> &'static str {
"DataSourceExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
Vec::new()
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let data_source = self.data_source.repartitioned(
target_partitions,
config.optimizer.repartition_file_min_size,
self.properties().eq_properties.output_ordering(),
)?;
if let Some(source) = data_source {
let output_partitioning = source.output_partitioning();
let plan = self
.clone()
.with_data_source(source)
.with_partitioning(output_partitioning);
Ok(Some(Arc::new(plan)))
} else {
Ok(Some(Arc::new(self.clone())))
}
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.data_source.open(partition, context)
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.data_source.metrics().clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
self.data_source.statistics()
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if let Some(partition) = partition {
let mut statistics = Statistics::new_unknown(&self.schema());
if let Some(file_config) =
self.data_source.as_any().downcast_ref::<FileScanConfig>()
{
if let Some(file_group) = file_config.file_groups.get(partition) {
if let Some(stat) = file_group.file_statistics(None) {
statistics = stat.clone();
}
}
}
Ok(statistics)
} else {
Ok(self.data_source.statistics()?)
}
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let data_source = self.data_source.with_fetch(limit)?;
let cache = self.cache.clone();
Some(Arc::new(Self { data_source, cache }))
}
fn fetch(&self) -> Option<usize> {
self.data_source.fetch()
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
self.data_source.try_swapping_with_projection(projection)
}
fn handle_child_pushdown_result(
&self,
child_pushdown_result: ChildPushdownResult,
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
let res = self.data_source.try_pushdown_filters(
child_pushdown_result.parent_filters.collect_all(),
config,
)?;
match res.updated_node {
Some(data_source) => {
let mut new_node = self.clone();
new_node.data_source = data_source;
new_node.cache =
Self::compute_properties(Arc::clone(&new_node.data_source));
Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: Some(Arc::new(new_node)),
})
}
None => Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: None,
}),
}
}
}
impl DataSourceExec {
pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
Arc::new(Self::new(Arc::new(data_source)))
}
pub fn new(data_source: Arc<dyn DataSource>) -> Self {
let cache = Self::compute_properties(Arc::clone(&data_source));
Self { data_source, cache }
}
pub fn data_source(&self) -> &Arc<dyn DataSource> {
&self.data_source
}
pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
self.cache = Self::compute_properties(Arc::clone(&data_source));
self.data_source = data_source;
self
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.cache = self.cache.with_constraints(constraints);
self
}
pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
self.cache = self.cache.with_partitioning(partitioning);
self
}
fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
PlanProperties::new(
data_source.eq_properties(),
data_source.output_partitioning(),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
self.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.and_then(|file_scan_conf| {
file_scan_conf
.file_source()
.as_any()
.downcast_ref::<T>()
.map(|source| (file_scan_conf, source))
})
}
}
impl<S> From<S> for DataSourceExec
where
S: DataSource + 'static,
{
fn from(source: S) -> Self {
Self::new(Arc::new(source))
}
}