use crate::{DistributedConfig, TaskCountAnnotation, TaskEstimation, TaskEstimator};
use crate::{WorkUnitFeed, WorkUnitFeedProto, WorkUnitFeedProvider};
use datafusion::catalog::memory::DataSourceExec;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::{Result, internal_datafusion_err};
use datafusion::common::{Statistics, internal_err};
use datafusion::config::ConfigOptions;
use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, FileScanConfigBuilder};
use datafusion::datasource::source::DataSource;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::execution_plan::SchedulingType;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion_proto::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
use datafusion_proto::protobuf as df_proto;
use datafusion_proto::protobuf::proto_error;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use prost::Message;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileScanWorkUnit {
#[prost(message, tag = "1")]
pub file: Option<df_proto::PartitionedFile>,
}
#[derive(Debug, Clone)]
pub struct FileScanWorkUnitProvider {
file_groups: Vec<FileGroup>,
}
impl FileScanWorkUnitProvider {
pub fn new(file_groups: Vec<FileGroup>) -> Self {
Self { file_groups }
}
}
impl WorkUnitFeedProvider for FileScanWorkUnitProvider {
type WorkUnit = FileScanWorkUnit;
fn feed(
&self,
partition: usize,
_ctx: Arc<TaskContext>,
) -> Result<BoxStream<'static, Result<Self::WorkUnit>>> {
let Some(file_group) = self.file_groups.get(partition) else {
return Ok(futures::stream::empty().boxed());
};
let stream = futures::stream::iter(file_group.files().to_vec()).map(|file| {
let file_proto: df_proto::PartitionedFile = (&file)
.try_into()
.map_err(|e| internal_datafusion_err!("{e}"))?;
Ok(FileScanWorkUnit {
file: Some(file_proto),
})
});
Ok(stream.boxed())
}
}
#[derive(Debug, Clone)]
pub struct WorkUnitFileScanConfig {
pub feed: WorkUnitFeed<FileScanWorkUnitProvider>,
pub fsc: FileScanConfig,
pub partitions: usize,
}
impl WorkUnitFileScanConfig {
pub fn new(mut fsc: FileScanConfig) -> Self {
let file_groups = std::mem::take(&mut fsc.file_groups);
Self {
partitions: file_groups.len(),
feed: WorkUnitFeed::new(FileScanWorkUnitProvider::new(file_groups)),
fsc,
}
}
}
impl DataSource for WorkUnitFileScanConfig {
fn open(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let inner = self.fsc.clone();
let schema = inner.projected_schema()?;
let stream = self
.feed
.feed(partition, Arc::clone(&context))?
.map(move |work_unit| {
let file = work_unit?.file.expect("missing file");
let single_file_group = df_proto::FileGroup { files: vec![file] };
let single_file_group = FileGroup::try_from(&single_file_group)?;
let new_config = FileScanConfigBuilder::from(inner.clone())
.with_file_groups(vec![single_file_group])
.build();
new_config.open(0, Arc::clone(&context))
})
.try_flatten();
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "WorkUnitFileScan: ",)?;
self.fsc.fmt_as(t, f)
}
DisplayFormatType::TreeRender => {
writeln!(f, "WorkUnitFileScan")?;
Ok(())
}
}
}
fn repartitioned(
&self,
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partitions)
}
fn eq_properties(&self) -> EquivalenceProperties {
self.fsc.eq_properties()
}
fn scheduling_type(&self) -> SchedulingType {
SchedulingType::Cooperative
}
fn partition_statistics(&self, _partition: Option<usize>) -> Result<Arc<Statistics>> {
self.fsc.partition_statistics(None)
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let new_template = FileScanConfigBuilder::from(self.fsc.clone())
.with_limit(limit)
.build();
Some(Arc::new(WorkUnitFileScanConfig {
feed: self.feed.clone(),
fsc: new_template,
partitions: self.partitions,
}))
}
fn fetch(&self) -> Option<usize> {
self.fsc.limit
}
fn metrics(&self) -> ExecutionPlanMetricsSet {
self.feed.metrics()
}
fn try_swapping_with_projection(
&self,
_projection: &datafusion::physical_expr::projection::ProjectionExprs,
) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}
}
#[derive(Debug)]
pub struct WorkUnitFileScanCodec;
#[derive(Clone, PartialEq, ::prost::Message)]
struct WorkUnitFileScanProto {
#[prost(bytes, tag = "1")]
inner: Vec<u8>,
#[prost(message, optional, tag = "2")]
feed: Option<WorkUnitFeedProto>,
#[prost(uint64, tag = "3")]
partitions: u64,
}
impl PhysicalExtensionCodec for WorkUnitFileScanCodec {
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
ctx: &TaskContext,
) -> Result<Arc<dyn ExecutionPlan>> {
if !inputs.is_empty() {
return internal_err!(
"WorkUnitFileScanConfig should have no children, got {}",
inputs.len()
);
}
let proto = WorkUnitFileScanProto::decode(buf)
.map_err(|e| proto_error(format!("Failed to decode WorkUnitFileScanProto: {e}")))?;
let plan_node = df_proto::PhysicalPlanNode::decode(&proto.inner[..])
.map_err(|e| proto_error(format!("Failed to decode template plan: {e}")))?;
let template_plan =
plan_node.try_into_physical_plan(ctx, &DefaultPhysicalExtensionCodec {})?;
let Some(dse) = template_plan.downcast_ref::<DataSourceExec>() else {
return Err(proto_error(
"Expected the WorkUnitFileScan template plan to be a DataSourceExec",
));
};
let Some(inner) = dse.data_source().downcast_ref::<FileScanConfig>() else {
return Err(proto_error(
"Expected the WorkUnitFileScan template DataSource to be a FileScanConfig",
));
};
let Some(feed_proto) = proto.feed else {
return Err(proto_error("WorkUnitFileScanProto missing feed"));
};
Ok(DataSourceExec::from_data_source(WorkUnitFileScanConfig {
feed: WorkUnitFeed::from_proto(feed_proto)?,
fsc: inner.clone(),
partitions: proto.partitions as usize,
}))
}
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
let Some(dse) = node.downcast_ref::<DataSourceExec>() else {
return internal_err!(
"Expected DataSourceExec wrapping a WorkUnitFileScanConfig, got {}",
node.name()
);
};
let Some(wfs) = dse.data_source().downcast_ref::<WorkUnitFileScanConfig>() else {
return internal_err!("Expected the inner DataSource to be a WorkUnitFileScanConfig");
};
let plan_node = df_proto::PhysicalPlanNode::try_from_physical_plan(
DataSourceExec::from_data_source(wfs.fsc.clone()),
&DefaultPhysicalExtensionCodec {},
)?;
let mut inner_bytes = Vec::new();
plan_node.encode(&mut inner_bytes).map_err(|e| {
proto_error(format!(
"Failed to encode WorkUnitFileScan template plan: {e}"
))
})?;
let proto = WorkUnitFileScanProto {
inner: inner_bytes,
feed: Some(wfs.feed.to_proto()),
partitions: wfs.partitions as u64,
};
proto
.encode(buf)
.map_err(|e| proto_error(format!("Failed to encode WorkUnitFileScanProto: {e}")))
}
}
#[derive(Debug, Default)]
pub struct WorkUnitFileScanRule;
impl PhysicalOptimizerRule for WorkUnitFileScanRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(|node| {
let Some(dse) = node.downcast_ref::<DataSourceExec>() else {
return Ok(Transformed::no(node));
};
let Some(fsc) = dse.data_source().downcast_ref::<FileScanConfig>() else {
return Ok(Transformed::no(node));
};
let new_ds = WorkUnitFileScanConfig::new(fsc.clone());
Ok(Transformed::yes(DataSourceExec::from_data_source(new_ds)))
})
.map(|t| t.data)
}
fn name(&self) -> &str {
"WorkUnitFileScanRule"
}
fn schema_check(&self) -> bool {
true
}
}
#[derive(Debug, Default)]
pub struct WorkUnitFileScanTaskEstimator;
impl TaskEstimator for WorkUnitFileScanTaskEstimator {
fn task_estimation(
&self,
plan: &Arc<dyn ExecutionPlan>,
cfg: &ConfigOptions,
) -> Option<TaskEstimation> {
let dse = plan.downcast_ref::<DataSourceExec>()?;
let wfs = dse.data_source().downcast_ref::<WorkUnitFileScanConfig>()?;
let d_cfg = cfg.extensions.get::<DistributedConfig>()?;
let mut total_bytes = 0;
for file_group in &wfs.feed.inner()?.file_groups {
for file in file_group.files() {
total_bytes += file.effective_size() as usize;
}
}
let task_count = total_bytes
.div_ceil(d_cfg.file_scan_config_bytes_per_partition)
.div_ceil(cfg.execution.target_partitions);
Some(TaskEstimation {
task_count: TaskCountAnnotation::Desired(task_count),
})
}
fn scale_up_leaf_node(
&self,
plan: &Arc<dyn ExecutionPlan>,
task_count: usize,
_cfg: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let Some(dse) = plan.downcast_ref::<DataSourceExec>() else {
return Ok(None);
};
let Some(wfs) = dse.data_source().downcast_ref::<WorkUnitFileScanConfig>() else {
return Ok(None);
};
let wuf_provider = wfs.feed.try_inner()?;
let mut new_file_groups = vec![];
for file_group in wuf_provider.file_groups.clone() {
new_file_groups.extend(file_group.split_files(task_count));
}
let new_provider = FileScanWorkUnitProvider::new(new_file_groups);
Ok(Some(
DataSourceExec::from_data_source(WorkUnitFileScanConfig {
feed: WorkUnitFeed::new(new_provider),
fsc: wfs.fsc.clone(),
partitions: wfs.partitions,
}) as Arc<dyn ExecutionPlan>,
))
}
}