use std::any::Any;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, OnceLock};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_plan::execution_plan::{
Boundedness, EmissionType, SchedulingType,
};
use datafusion_physical_plan::metrics::SplitMetrics;
use datafusion_physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::stream::BatchSplitStream;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use itertools::Itertools;
use crate::file::FileSource;
use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
};
pub trait DataSource: Any + Send + Sync + Debug {
fn open(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
fn repartitioned(
&self,
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}
fn output_partitioning(&self) -> Partitioning;
fn eq_properties(&self) -> EquivalenceProperties;
fn scheduling_type(&self) -> SchedulingType {
SchedulingType::NonCooperative
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>>;
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
fn metrics(&self) -> ExecutionPlanMetricsSet {
ExecutionPlanMetricsSet::new()
}
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn DataSource>>>;
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
))
}
fn try_pushdown_sort(
&self,
_order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
Ok(SortOrderPushdownResult::Unsupported)
}
fn with_preserve_order(&self, _preserve_order: bool) -> Option<Arc<dyn DataSource>> {
None
}
fn with_new_state(
&self,
_state: Arc<dyn Any + Send + Sync>,
) -> Option<Arc<dyn DataSource>> {
None
}
fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
None
}
fn open_with_args(&self, args: OpenArgs) -> Result<SendableRecordBatchStream> {
self.open(args.partition, args.context)
}
}
#[derive(Debug, Clone)]
pub struct OpenArgs {
pub partition: usize,
pub context: Arc<TaskContext>,
pub sibling_state: Option<Arc<dyn Any + Send + Sync>>,
}
impl OpenArgs {
pub fn new(partition: usize, context: Arc<TaskContext>) -> Self {
Self {
partition,
context,
sibling_state: None,
}
}
pub fn with_shared_state(
mut self,
sibling_state: Option<Arc<dyn Any + Send + Sync>>,
) -> Self {
self.sibling_state = sibling_state;
self
}
}
impl dyn DataSource {
pub fn is<T: DataSource>(&self) -> bool {
(self as &dyn Any).is::<T>()
}
pub fn downcast_ref<T: DataSource>(&self) -> Option<&T> {
(self as &dyn Any).downcast_ref()
}
}
#[derive(Clone, Debug)]
pub struct DataSourceExec {
data_source: Arc<dyn DataSource>,
cache: Arc<PlanProperties>,
execution_state: Arc<OnceLock<Option<Arc<dyn Any + Send + Sync>>>>,
}
impl DisplayAs for DataSourceExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "DataSourceExec: ")?;
}
DisplayFormatType::TreeRender => {}
}
self.data_source.fmt_as(t, f)
}
}
impl ExecutionPlan for DataSourceExec {
fn name(&self) -> &'static str {
"DataSourceExec"
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
Vec::new()
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let data_source = self.data_source.repartitioned(
target_partitions,
config.optimizer.repartition_file_min_size,
self.properties().eq_properties.output_ordering(),
)?;
Ok(data_source.map(|source| {
let output_partitioning = source.output_partitioning();
let plan = self
.clone()
.with_data_source(source)
.with_partitioning(output_partitioning);
Arc::new(plan) as _
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let shared_state = self
.execution_state
.get_or_init(|| self.data_source.create_sibling_state())
.clone();
let args = OpenArgs::new(partition, Arc::clone(&context))
.with_shared_state(shared_state);
let stream = self.data_source.open_with_args(args)?;
let batch_size = context.session_config().batch_size();
log::debug!(
"Batch splitting enabled for partition {partition}: batch_size={batch_size}"
);
let metrics = self.data_source.metrics();
let split_metrics = SplitMetrics::new(&metrics, partition);
Ok(Box::pin(BatchSplitStream::new(
stream,
batch_size,
split_metrics,
)))
}
fn metrics(&self) -> Option<MetricsSet> {
let mut metrics = self.data_source.metrics().clone_inner();
if let Some(file_scan_config) = self.data_source.downcast_ref::<FileScanConfig>()
&& file_scan_config.file_source().file_type() == "parquet"
&& let Some(output_rows_skew) =
BaselineMetrics::output_rows_skew_metric(&metrics)
{
metrics.push(output_rows_skew);
}
Some(metrics)
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
self.data_source.partition_statistics(partition)
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let data_source = self.data_source.with_fetch(limit)?;
let cache = Arc::clone(&self.cache);
let execution_state = Arc::new(OnceLock::new());
Some(Arc::new(Self {
data_source,
cache,
execution_state,
}))
}
fn fetch(&self) -> Option<usize> {
self.data_source.fetch()
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
match self
.data_source
.try_swapping_with_projection(projection.projection_expr())?
{
Some(new_data_source) => {
Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
}
None => Ok(None),
}
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
let parent_filters = child_pushdown_result
.parent_filters
.into_iter()
.map(|f| f.filter)
.collect_vec();
let res = self
.data_source
.try_pushdown_filters(parent_filters, config)?;
match res.updated_node {
Some(data_source) => {
let mut new_node = self.clone();
new_node.data_source = data_source;
new_node.cache =
Arc::new(Self::compute_properties(&new_node.data_source));
Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: Some(Arc::new(new_node)),
})
}
None => Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: None,
}),
}
}
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
self.data_source
.try_pushdown_sort(order)?
.try_map(|new_data_source| {
let new_exec = self.clone().with_data_source(new_data_source);
Ok(Arc::new(new_exec) as Arc<dyn ExecutionPlan>)
})
}
fn with_preserve_order(
&self,
preserve_order: bool,
) -> Option<Arc<dyn ExecutionPlan>> {
self.data_source
.with_preserve_order(preserve_order)
.map(|new_data_source| {
Arc::new(self.clone().with_data_source(new_data_source))
as Arc<dyn ExecutionPlan>
})
}
fn with_new_state(
&self,
state: Arc<dyn Any + Send + Sync>,
) -> Option<Arc<dyn ExecutionPlan>> {
self.data_source
.with_new_state(state)
.map(|new_data_source| {
Arc::new(self.clone().with_data_source(new_data_source))
as Arc<dyn ExecutionPlan>
})
}
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_exec = Arc::unwrap_or_clone(self);
new_exec.execution_state = Arc::new(OnceLock::new());
Ok(Arc::new(new_exec))
}
}
impl DataSourceExec {
pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
Arc::new(Self::new(Arc::new(data_source)))
}
pub fn new(data_source: Arc<dyn DataSource>) -> Self {
let cache = Self::compute_properties(&data_source);
Self {
data_source,
cache: Arc::new(cache),
execution_state: Arc::new(OnceLock::new()),
}
}
pub fn data_source(&self) -> &Arc<dyn DataSource> {
&self.data_source
}
pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
self.cache = Arc::new(Self::compute_properties(&data_source));
self.data_source = data_source;
self.execution_state = Arc::new(OnceLock::new());
self
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
Arc::make_mut(&mut self.cache).set_constraints(constraints);
self
}
pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
Arc::make_mut(&mut self.cache).partitioning = partitioning;
self
}
fn compute_properties(data_source: &Arc<dyn DataSource>) -> PlanProperties {
PlanProperties::new(
data_source.eq_properties(),
data_source.output_partitioning(),
EmissionType::Incremental,
Boundedness::Bounded,
)
.with_scheduling_type(data_source.scheduling_type())
}
pub fn downcast_to_file_source<T: FileSource>(
&self,
) -> Option<(&FileScanConfig, &T)> {
self.data_source()
.downcast_ref::<FileScanConfig>()
.and_then(|file_scan_conf| {
file_scan_conf
.file_source()
.downcast_ref::<T>()
.map(|source| (file_scan_conf, source))
})
}
}
impl<S> From<S> for DataSourceExec
where
S: DataSource + 'static,
{
fn from(source: S) -> Self {
Self::new(Arc::new(source))
}
}