use std::any::Any;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
#[expect(deprecated)]
use crate::schema_adapter::SchemaAdapterFactory;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, not_impl_err};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use object_store::ObjectStore;
pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> {
Arc::new(source)
}
pub trait FileSource: Send + Sync {
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Arc<dyn FileOpener>>;
fn as_any(&self) -> &dyn Any;
fn table_schema(&self) -> &crate::table_schema::TableSchema;
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
None
}
fn projection(&self) -> Option<&ProjectionExprs> {
None
}
fn metrics(&self) -> &ExecutionPlanMetricsSet;
fn file_type(&self) -> &str;
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
Ok(())
}
fn supports_repartitioning(&self) -> bool {
true
}
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || !self.supports_repartitioning()
{
return Ok(None);
}
let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&config.file_groups);
if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = config.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(source));
}
Ok(None)
}
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
))
}
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
eq_properties: &EquivalenceProperties,
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
#[expect(deprecated)]
self.try_reverse_output(order, eq_properties)
}
#[deprecated(
since = "53.0.0",
note = "Renamed to try_pushdown_sort. This method was never limited to reversing output. It will be removed in 59.0.0 or later."
)]
fn try_reverse_output(
&self,
_order: &[PhysicalSortExpr],
_eq_properties: &EquivalenceProperties,
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
Ok(SortOrderPushdownResult::Unsupported)
}
fn try_pushdown_projection(
&self,
_projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn FileSource>>> {
Ok(None)
}
#[deprecated(
since = "53.0.0",
note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
fn with_schema_adapter_factory(
&self,
_factory: Arc<dyn SchemaAdapterFactory>,
) -> Result<Arc<dyn FileSource>> {
not_impl_err!(
"SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)
}
#[deprecated(
since = "53.0.0",
note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
None
}
}