use crate::config::SchemaSource;
use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
use crate::{ListingOptions, ListingTableConfig};
use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
use async_trait::async_trait;
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
use datafusion_common::stats::Precision;
use datafusion_common::{
Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig};
#[expect(deprecated)]
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::create_lex_ordering;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::empty::EmptyExec;
use futures::{Stream, StreamExt, TryStreamExt, future, stream};
use object_store::ObjectStore;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug)]
pub struct ListFilesResult {
pub file_groups: Vec<FileGroup>,
pub statistics: Statistics,
pub grouped_by_partition: bool,
}
#[derive(Debug, Clone)]
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
file_schema: SchemaRef,
table_schema: SchemaRef,
schema_source: SchemaSource,
options: ListingOptions,
definition: Option<String>,
collected_statistics: Arc<dyn FileStatisticsCache>,
constraints: Constraints,
column_defaults: HashMap<String, Expr>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
}
impl ListingTable {
pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result<Self> {
let schema_source = config.schema_source();
let file_schema = config
.file_schema
.ok_or_else(|| internal_datafusion_err!("No schema provided."))?;
let options = config
.options
.ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;
let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
for (part_col_name, part_col_type) in &options.table_partition_cols {
builder.push(Field::new(part_col_name, part_col_type.clone(), false));
}
let table_schema = Arc::new(
builder
.finish()
.with_metadata(file_schema.metadata().clone()),
);
let table = Self {
table_paths: config.table_paths,
file_schema,
table_schema,
schema_source,
options,
definition: None,
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
constraints: Constraints::default(),
column_defaults: HashMap::new(),
expr_adapter_factory: config.expr_adapter_factory,
};
Ok(table)
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = constraints;
self
}
pub fn with_column_defaults(
mut self,
column_defaults: HashMap<String, Expr>,
) -> Self {
self.column_defaults = column_defaults;
self
}
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
self.collected_statistics =
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
self
}
pub fn with_definition(mut self, definition: Option<String>) -> Self {
self.definition = definition;
self
}
pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
&self.table_paths
}
pub fn options(&self) -> &ListingOptions {
&self.options
}
pub fn schema_source(&self) -> SchemaSource {
self.schema_source
}
#[deprecated(
since = "52.0.0",
note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
pub fn with_schema_adapter_factory(
self,
_schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
self
}
#[deprecated(
since = "52.0.0",
note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
pub fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
None
}
fn create_file_source(&self) -> Arc<dyn FileSource> {
let table_schema = TableSchema::new(
Arc::clone(&self.file_schema),
self.options
.table_partition_cols
.iter()
.map(|(col, field)| Arc::new(Field::new(col, field.clone(), false)))
.collect(),
);
self.options.format.file_source(table_schema)
}
pub fn try_create_output_ordering(
&self,
execution_props: &ExecutionProps,
file_groups: &[FileGroup],
) -> datafusion_common::Result<Vec<LexOrdering>> {
if !self.options.file_sort_order.is_empty() {
return create_lex_ordering(
&self.table_schema,
&self.options.file_sort_order,
execution_props,
);
}
if let Some(ordering) = derive_common_ordering_from_files(file_groups) {
return Ok(vec![ordering]);
}
Ok(vec![])
}
}
fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option<LexOrdering> {
enum CurrentOrderingState {
FirstFile,
SomeOrdering(LexOrdering),
NoOrdering,
}
let mut state = CurrentOrderingState::FirstFile;
for group in file_groups {
for file in group.iter() {
state = match (&state, &file.ordering) {
(CurrentOrderingState::FirstFile, Some(ordering)) => {
CurrentOrderingState::SomeOrdering(ordering.clone())
}
(CurrentOrderingState::FirstFile, None) => {
CurrentOrderingState::NoOrdering
}
(CurrentOrderingState::SomeOrdering(current), Some(ordering)) => {
let prefix_len = current
.as_ref()
.iter()
.zip(ordering.as_ref().iter())
.take_while(|(a, b)| a == b)
.count();
if prefix_len == 0 {
log::trace!(
"Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}"
);
return None;
} else {
let ordering =
LexOrdering::new(current.as_ref()[..prefix_len].to_vec())
.expect("prefix_len > 0, so ordering must be valid");
CurrentOrderingState::SomeOrdering(ordering)
}
}
(CurrentOrderingState::SomeOrdering(ordering), None)
| (CurrentOrderingState::NoOrdering, Some(ordering)) => {
log::trace!(
"Cannot derive common ordering: some files have ordering {ordering:?}, others don't"
);
return None;
}
(CurrentOrderingState::NoOrdering, None) => {
CurrentOrderingState::NoOrdering
}
};
}
}
match state {
CurrentOrderingState::SomeOrdering(ordering) => Some(ordering),
_ => None,
}
}
fn can_be_evaluated_for_partition_pruning(
partition_column_names: &[&str],
expr: &Expr,
) -> bool {
!partition_column_names.is_empty()
&& expr_applicable_for_cols(partition_column_names, expr)
}
#[async_trait]
impl TableProvider for ListingTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
fn constraints(&self) -> Option<&Constraints> {
Some(&self.constraints)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let options = ScanArgs::default()
.with_projection(projection.map(|p| p.as_slice()))
.with_filters(Some(filters))
.with_limit(limit);
Ok(self.scan_with_args(state, options).await?.into_inner())
}
async fn scan_with_args<'a>(
&self,
state: &dyn Session,
args: ScanArgs<'a>,
) -> datafusion_common::Result<ScanResult> {
let projection = args.projection().map(|p| p.to_vec());
let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
let limit = args.limit();
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| Ok(Arc::new(self.table_schema.field_with_name(&col.0)?.clone())))
.collect::<datafusion_common::Result<Vec<_>>>()?;
let table_partition_col_names = table_partition_cols
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
let (partition_filters, filters): (Vec<_>, Vec<_>) =
filters.iter().cloned().partition(|filter| {
can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
});
let statistic_file_limit = if filters.is_empty() { limit } else { None };
let ListFilesResult {
file_groups: mut partitioned_file_lists,
statistics,
grouped_by_partition: partitioned_by_file_group,
} = self
.list_files_for_scan(state, &partition_filters, statistic_file_limit)
.await?;
if partitioned_file_lists.is_empty() {
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
}
let output_ordering = self.try_create_output_ordering(
state.execution_props(),
&partitioned_file_lists,
)?;
match state
.config_options()
.execution
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
self.options.target_partitions,
)
})
})
.flatten()
{
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!(
"attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered"
)
}
}
None => {} };
let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
else {
return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
Schema::empty(),
)))));
};
let file_source = self.create_file_source();
let plan = self
.options
.format
.create_physical_plan(
state,
FileScanConfigBuilder::new(object_store_url, file_source)
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection_indices(projection)?
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_expr_adapter(self.expr_adapter_factory.clone())
.with_partitioned_by_file_group(partitioned_by_file_group)
.build(),
)
.await?;
Ok(ScanResult::new(plan))
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
let partition_column_names = self
.options
.table_partition_cols
.iter()
.map(|col| col.0.as_str())
.collect::<Vec<_>>();
filters
.iter()
.map(|filter| {
if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
{
return Ok(TableProviderFilterPushDown::Exact);
}
Ok(TableProviderFilterPushDown::Inexact)
})
.collect()
}
fn get_table_definition(&self) -> Option<&str> {
self.definition.as_deref()
}
async fn insert_into(
&self,
state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
insert_op: InsertOp,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
self.schema()
.logically_equivalent_names_and_types(&input.schema())?;
let table_path = &self.table_paths()[0];
if !table_path.is_collection() {
return plan_err!(
"Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
);
}
let store = state.runtime_env().object_store(table_path)?;
let file_list_stream = pruned_partition_list(
state,
store.as_ref(),
table_path,
&[],
&self.options.file_extension,
&self.options.table_partition_cols,
)
.await?;
let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
let keep_partition_by_columns =
state.config_options().execution.keep_partition_by_columns;
if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
let key = TableScopedPath {
table: table_path.get_table_ref().clone(),
path: table_path.prefix().clone(),
};
let _ = lfc.remove(&key);
}
let config = FileSinkConfig {
original_url: String::default(),
object_store_url: self.table_paths()[0].object_store(),
table_paths: self.table_paths().clone(),
file_group,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
insert_op,
keep_partition_by_columns,
file_extension: self.options().format.get_ext(),
file_output_mode: FileOutputMode::Automatic,
};
let orderings = self.try_create_output_ordering(state.execution_props(), &[])?;
let order_requirements = orderings.into_iter().next().map(Into::into);
self.options()
.format
.create_writer_physical_plan(input, state, config, order_requirements)
.await
}
fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.column_defaults.get(column)
}
}
impl ListingTable {
pub async fn list_files_for_scan<'a>(
&'a self,
ctx: &'a dyn Session,
filters: &'a [Expr],
limit: Option<usize>,
) -> datafusion_common::Result<ListFilesResult> {
let store = if let Some(url) = self.table_paths.first() {
ctx.runtime_env().object_store(url)?
} else {
return Ok(ListFilesResult {
file_groups: vec![],
statistics: Statistics::new_unknown(&self.file_schema),
grouped_by_partition: false,
});
};
let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
pruned_partition_list(
ctx,
store.as_ref(),
table_path,
filters,
&self.options.file_extension,
&self.options.table_partition_cols,
)
}))
.await?;
let meta_fetch_concurrency =
ctx.config_options().execution.meta_fetch_concurrency;
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let (statistics, ordering) = if self.options.collect_stat {
self.do_collect_statistics_and_ordering(ctx, &store, &part_file)
.await?
} else {
(Arc::new(Statistics::new_unknown(&self.file_schema)), None)
};
Ok(part_file
.with_statistics(statistics)
.with_ordering(ordering))
})
.boxed()
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;
let threshold = ctx.config_options().optimizer.preserve_file_partitions;
let (file_groups, grouped_by_partition) = if threshold > 0
&& !self.options.table_partition_cols.is_empty()
{
let grouped =
file_group.group_by_partition_values(self.options.target_partitions);
if grouped.len() >= threshold {
(grouped, true)
} else {
let all_files: Vec<_> =
grouped.into_iter().flat_map(|g| g.into_inner()).collect();
(
FileGroup::new(all_files).split_files(self.options.target_partitions),
false,
)
}
} else {
(
file_group.split_files(self.options.target_partitions),
false,
)
};
let (file_groups, stats) = compute_all_files_statistics(
file_groups,
self.schema(),
self.options.collect_stat,
inexact_stats,
)?;
Ok(ListFilesResult {
file_groups,
statistics: stats,
grouped_by_partition,
})
}
async fn do_collect_statistics_and_ordering(
&self,
ctx: &dyn Session,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
use datafusion_execution::cache::cache_manager::CachedFileMetadata;
let path = &part_file.object_meta.location;
let meta = &part_file.object_meta;
if let Some(cached) = self.collected_statistics.get(path)
&& cached.is_valid_for(meta)
{
return Ok((Arc::clone(&cached.statistics), cached.ordering.clone()));
}
let file_meta = self
.options
.format
.infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
.await?;
let statistics = Arc::new(file_meta.statistics);
self.collected_statistics.put(
path,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
file_meta.ordering.clone(),
),
);
Ok((statistics, file_meta.ordering))
}
}
async fn get_files_with_limit(
files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
limit: Option<usize>,
collect_stats: bool,
) -> datafusion_common::Result<(FileGroup, bool)> {
let mut file_group = FileGroup::default();
let mut all_files = Box::pin(files.fuse());
enum ProcessingState {
ReadingFiles,
ReachedLimit,
}
let mut state = ProcessingState::ReadingFiles;
let mut num_rows = Precision::Absent;
while let Some(file_result) = all_files.next().await {
if matches!(state, ProcessingState::ReachedLimit) {
break;
}
let file = file_result?;
if collect_stats && let Some(file_stats) = &file.statistics {
num_rows = if file_group.is_empty() {
file_stats.num_rows
} else {
num_rows.add(&file_stats.num_rows)
};
}
file_group.push(file);
if let Some(limit) = limit
&& let Precision::Exact(row_count) = num_rows
&& row_count > limit
{
state = ProcessingState::ReachedLimit;
}
}
let inexact_stats = all_files.next().await.is_some();
Ok((file_group, inexact_stats))
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::compute::SortOptions;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use std::sync::Arc;
fn sort_expr(
name: &str,
idx: usize,
descending: bool,
nulls_first: bool,
) -> PhysicalSortExpr {
PhysicalSortExpr::new(
Arc::new(Column::new(name, idx)),
SortOptions {
descending,
nulls_first,
},
)
}
fn lex_ordering(exprs: Vec<PhysicalSortExpr>) -> LexOrdering {
LexOrdering::new(exprs).expect("expected non-empty ordering")
}
fn create_file(name: &str, ordering: Option<LexOrdering>) -> PartitionedFile {
PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering)
}
#[test]
fn test_derive_common_ordering_all_files_same_ordering() {
let ordering = lex_ordering(vec![
sort_expr("a", 0, false, true),
sort_expr("b", 1, true, false),
]);
let file_groups = vec![
FileGroup::new(vec![
create_file("f1.parquet", Some(ordering.clone())),
create_file("f2.parquet", Some(ordering.clone())),
]),
FileGroup::new(vec![create_file("f3.parquet", Some(ordering.clone()))]),
];
let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, Some(ordering));
}
#[test]
fn test_derive_common_ordering_common_prefix() {
let ordering_abc = lex_ordering(vec![
sort_expr("a", 0, false, true),
sort_expr("b", 1, false, true),
sort_expr("c", 2, false, true),
]);
let ordering_ab = lex_ordering(vec![
sort_expr("a", 0, false, true),
sort_expr("b", 1, false, true),
]);
let file_groups = vec![FileGroup::new(vec![
create_file("f1.parquet", Some(ordering_abc)),
create_file("f2.parquet", Some(ordering_ab.clone())),
])];
let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, Some(ordering_ab));
}
#[test]
fn test_derive_common_ordering_no_common_prefix() {
let ordering_a = lex_ordering(vec![sort_expr("a", 0, false, true)]);
let ordering_b = lex_ordering(vec![sort_expr("b", 1, false, true)]);
let file_groups = vec![FileGroup::new(vec![
create_file("f1.parquet", Some(ordering_a)),
create_file("f2.parquet", Some(ordering_b)),
])];
let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, None);
}
#[test]
fn test_derive_common_ordering_mixed_with_none() {
let ordering = lex_ordering(vec![sort_expr("a", 0, false, true)]);
let file_groups = vec![FileGroup::new(vec![
create_file("f1.parquet", Some(ordering)),
create_file("f2.parquet", None),
])];
let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, None);
}
#[test]
fn test_derive_common_ordering_all_none() {
let file_groups = vec![FileGroup::new(vec![
create_file("f1.parquet", None),
create_file("f2.parquet", None),
])];
let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, None);
}
#[test]
fn test_derive_common_ordering_empty_groups() {
let file_groups: Vec<FileGroup> = vec![];
let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, None);
}
#[test]
fn test_derive_common_ordering_single_file() {
let ordering = lex_ordering(vec![
sort_expr("a", 0, false, true),
sort_expr("b", 1, true, false),
]);
let file_groups = vec![FileGroup::new(vec![create_file(
"f1.parquet",
Some(ordering.clone()),
)])];
let result = derive_common_ordering_from_files(&file_groups);
assert_eq!(result, Some(ordering));
}
}