use std::any::Any;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_plan::execution_plan::{
Boundedness, EmissionType, SchedulingType,
};
use datafusion_physical_plan::metrics::SplitMetrics;
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::stream::BatchSplitStream;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use itertools::Itertools;
use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
};
pub trait DataSource: Send + Sync + Debug {
fn open(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
fn repartitioned(
&self,
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}
fn output_partitioning(&self) -> Partitioning;
fn eq_properties(&self) -> EquivalenceProperties;
fn scheduling_type(&self) -> SchedulingType {
SchedulingType::NonCooperative
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
fn metrics(&self) -> ExecutionPlanMetricsSet {
ExecutionPlanMetricsSet::new()
}
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn DataSource>>>;
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
))
}
fn try_pushdown_sort(
&self,
_order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
Ok(SortOrderPushdownResult::Unsupported)
}
fn with_preserve_order(&self, _preserve_order: bool) -> Option<Arc<dyn DataSource>> {
None
}
}
#[derive(Clone, Debug)]
pub struct DataSourceExec {
data_source: Arc<dyn DataSource>,
cache: Arc<PlanProperties>,
}
impl DisplayAs for DataSourceExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "DataSourceExec: ")?;
}
DisplayFormatType::TreeRender => {}
}
self.data_source.fmt_as(t, f)
}
}
impl ExecutionPlan for DataSourceExec {
fn name(&self) -> &'static str {
"DataSourceExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
Vec::new()
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let data_source = self.data_source.repartitioned(
target_partitions,
config.optimizer.repartition_file_min_size,
self.properties().eq_properties.output_ordering(),
)?;
Ok(data_source.map(|source| {
let output_partitioning = source.output_partitioning();
let plan = self
.clone()
.with_data_source(source)
.with_partitioning(output_partitioning);
Arc::new(plan) as _
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let stream = self.data_source.open(partition, Arc::clone(&context))?;
let batch_size = context.session_config().batch_size();
log::debug!(
"Batch splitting enabled for partition {partition}: batch_size={batch_size}"
);
let metrics = self.data_source.metrics();
let split_metrics = SplitMetrics::new(&metrics, partition);
Ok(Box::pin(BatchSplitStream::new(
stream,
batch_size,
split_metrics,
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.data_source.metrics().clone_inner())
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.data_source.partition_statistics(partition)
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let data_source = self.data_source.with_fetch(limit)?;
let cache = Arc::clone(&self.cache);
Some(Arc::new(Self { data_source, cache }))
}
fn fetch(&self) -> Option<usize> {
self.data_source.fetch()
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
match self
.data_source
.try_swapping_with_projection(projection.projection_expr())?
{
Some(new_data_source) => {
Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
}
None => Ok(None),
}
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
let parent_filters = child_pushdown_result
.parent_filters
.into_iter()
.map(|f| f.filter)
.collect_vec();
let res = self
.data_source
.try_pushdown_filters(parent_filters, config)?;
match res.updated_node {
Some(data_source) => {
let mut new_node = self.clone();
new_node.data_source = data_source;
new_node.cache =
Arc::new(Self::compute_properties(&new_node.data_source));
Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: Some(Arc::new(new_node)),
})
}
None => Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: None,
}),
}
}
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
self.data_source
.try_pushdown_sort(order)?
.try_map(|new_data_source| {
let new_exec = self.clone().with_data_source(new_data_source);
Ok(Arc::new(new_exec) as Arc<dyn ExecutionPlan>)
})
}
fn with_preserve_order(
&self,
preserve_order: bool,
) -> Option<Arc<dyn ExecutionPlan>> {
self.data_source
.with_preserve_order(preserve_order)
.map(|new_data_source| {
Arc::new(self.clone().with_data_source(new_data_source))
as Arc<dyn ExecutionPlan>
})
}
}
impl DataSourceExec {
pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
Arc::new(Self::new(Arc::new(data_source)))
}
pub fn new(data_source: Arc<dyn DataSource>) -> Self {
let cache = Self::compute_properties(&data_source);
Self {
data_source,
cache: Arc::new(cache),
}
}
pub fn data_source(&self) -> &Arc<dyn DataSource> {
&self.data_source
}
pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
self.cache = Arc::new(Self::compute_properties(&data_source));
self.data_source = data_source;
self
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
Arc::make_mut(&mut self.cache).set_constraints(constraints);
self
}
pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
Arc::make_mut(&mut self.cache).partitioning = partitioning;
self
}
fn compute_properties(data_source: &Arc<dyn DataSource>) -> PlanProperties {
PlanProperties::new(
data_source.eq_properties(),
data_source.output_partitioning(),
EmissionType::Incremental,
Boundedness::Bounded,
)
.with_scheduling_type(data_source.scheduling_type())
}
pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
self.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.and_then(|file_scan_conf| {
file_scan_conf
.file_source()
.as_any()
.downcast_ref::<T>()
.map(|source| (file_scan_conf, source))
})
}
}
impl<S> From<S> for DataSourceExec
where
S: DataSource + 'static,
{
fn from(source: S) -> Self {
Self::new(Arc::new(source))
}
}