Skip to main content

datafusion_catalog_listing/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::config::SchemaSource;
19use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
20use crate::{ListingOptions, ListingTableConfig};
21use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
22use async_trait::async_trait;
23use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
24use datafusion_common::stats::Precision;
25use datafusion_common::{
26    Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
27};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig};
32#[expect(deprecated)]
33use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
34use datafusion_datasource::{
35    ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
36};
37use datafusion_execution::cache::TableScopedPath;
38use datafusion_execution::cache::cache_manager::FileStatisticsCache;
39use datafusion_expr::dml::InsertOp;
40use datafusion_expr::execution_props::ExecutionProps;
41use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
42use datafusion_physical_expr::create_lex_ordering;
43use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
44use datafusion_physical_expr_common::sort_expr::LexOrdering;
45use datafusion_physical_plan::ExecutionPlan;
46use datafusion_physical_plan::empty::EmptyExec;
47use futures::{Stream, StreamExt, TryStreamExt, future, stream};
48use object_store::ObjectStore;
49use std::collections::HashMap;
50use std::sync::Arc;
51
52/// Result of a file listing operation from [`ListingTable::list_files_for_scan`].
53#[derive(Debug)]
54pub struct ListFilesResult {
55    /// File groups organized by the partitioning strategy.
56    pub file_groups: Vec<FileGroup>,
57    /// Aggregated statistics for all files.
58    pub statistics: Statistics,
59    /// Whether files are grouped by partition values (enables Hash partitioning).
60    pub grouped_by_partition: bool,
61}
62
63/// Built in [`TableProvider`] that reads data from one or more files as a single table.
64///
65/// The files are read using an  [`ObjectStore`] instance, for example from
66/// local files or objects from AWS S3.
67///
68/// # Features:
69/// * Reading multiple files as a single table
70/// * Hive style partitioning (e.g., directories named `date=2024-06-01`)
71/// * Merges schemas from files with compatible but not identical schemas (see [`ListingTableConfig::file_schema`])
72/// * `limit`, `filter` and `projection` pushdown for formats that support it (e.g.,
73///   Parquet)
74/// * Statistics collection and pruning based on file metadata
75/// * Pre-existing sort order (see [`ListingOptions::file_sort_order`])
76/// * Metadata caching to speed up repeated queries (see [`FileMetadataCache`])
77/// * Statistics caching (see [`FileStatisticsCache`])
78///
79/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache
80///
81/// # Reading Directories and Hive Style Partitioning
82///
83/// For example, given the `table1` directory (or object store prefix)
84///
85/// ```text
86/// table1
87///  ├── file1.parquet
88///  └── file2.parquet
89/// ```
90///
91/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as
92/// a single table, merging the schemas if the files have compatible but not
93/// identical schemas.
94///
95/// Given the `table2` directory (or object store prefix)
96///
97/// ```text
98/// table2
99///  ├── date=2024-06-01
100///  │    ├── file3.parquet
101///  │    └── file4.parquet
102///  └── date=2024-06-02
103///       └── file5.parquet
104/// ```
105///
106/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and
107/// `file5.parquet` as a single table, again merging schemas if necessary.
108///
109/// Given the hive style partitioning structure (e.g,. directories named
110/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date`
111/// column when reading the table:
112/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01`
113/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`.
114///
115/// If the query has a predicate like `WHERE date = '2024-06-01'`
116/// only the corresponding directory will be read.
117///
118/// # See Also
119///
120/// 1. [`ListingTableConfig`]: Configuration options
121/// 1. [`DataSourceExec`]: `ExecutionPlan` used by `ListingTable`
122///
123/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
124///
125/// # Caching Metadata
126///
127/// Some formats, such as Parquet, use the `FileMetadataCache` to cache file
128/// metadata that is needed to execute but expensive to read, such as row
129/// groups and statistics. The cache is scoped to the `SessionContext` and can
130/// be configured via the [runtime config options].
131///
132/// [runtime config options]: https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings
133///
134/// # Example: Read a directory of parquet files using a [`ListingTable`]
135///
136/// ```no_run
137/// # use datafusion_common::Result;
138/// # use std::sync::Arc;
139/// # use datafusion_catalog::TableProvider;
140/// # use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
141/// # use datafusion_datasource::ListingTableUrl;
142/// # use datafusion_datasource_parquet::file_format::ParquetFormat;/// #
143/// # use datafusion_catalog::Session;
144/// async fn get_listing_table(session: &dyn Session) -> Result<Arc<dyn TableProvider>> {
145/// let table_path = "/path/to/parquet";
146///
147/// // Parse the path
148/// let table_path = ListingTableUrl::parse(table_path)?;
149///
150/// // Create default parquet options
151/// let file_format = ParquetFormat::new();
152/// let listing_options = ListingOptions::new(Arc::new(file_format))
153///   .with_file_extension(".parquet");
154///
155/// // Resolve the schema
156/// let resolved_schema = listing_options
157///    .infer_schema(session, &table_path)
158///    .await?;
159///
160/// let config = ListingTableConfig::new(table_path)
161///   .with_listing_options(listing_options)
162///   .with_schema(resolved_schema);
163///
164/// // Create a new TableProvider
165/// let provider = Arc::new(ListingTable::try_new(config)?);
166///
167/// # Ok(provider)
168/// # }
169/// ```
170#[derive(Debug, Clone)]
171pub struct ListingTable {
172    table_paths: Vec<ListingTableUrl>,
173    /// `file_schema` contains only the columns physically stored in the data files themselves.
174    ///     - Represents the actual fields found in files like Parquet, CSV, etc.
175    ///     - Used when reading the raw data from files
176    file_schema: SchemaRef,
177    /// `table_schema` combines `file_schema` + partition columns
178    ///     - Partition columns are derived from directory paths (not stored in files)
179    ///     - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet`
180    table_schema: SchemaRef,
181    /// Indicates how the schema was derived (inferred or explicitly specified)
182    schema_source: SchemaSource,
183    /// Options used to configure the listing table such as the file format
184    /// and partitioning information
185    options: ListingOptions,
186    /// The SQL definition for this table, if any
187    definition: Option<String>,
188    /// Cache for collected file statistics
189    collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
190    /// Constraints applied to this table
191    constraints: Constraints,
192    /// Column default expressions for columns that are not physically present in the data files
193    column_defaults: HashMap<String, Expr>,
194    /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
195    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
196}
197
198impl ListingTable {
199    /// Create new [`ListingTable`]
200    ///
201    /// See documentation and example on [`ListingTable`] and [`ListingTableConfig`]
202    pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result<Self> {
203        // Extract schema_source before moving other parts of the config
204        let schema_source = config.schema_source();
205
206        let file_schema = config
207            .file_schema
208            .ok_or_else(|| internal_datafusion_err!("No schema provided."))?;
209
210        let options = config
211            .options
212            .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;
213
214        // Add the partition columns to the file schema
215        let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
216        for (part_col_name, part_col_type) in &options.table_partition_cols {
217            builder.push(Field::new(part_col_name, part_col_type.clone(), false));
218        }
219
220        let table_schema = Arc::new(
221            builder
222                .finish()
223                .with_metadata(file_schema.metadata().clone()),
224        );
225
226        let table = Self {
227            table_paths: config.table_paths,
228            file_schema,
229            table_schema,
230            schema_source,
231            options,
232            definition: None,
233            collected_statistics: None,
234            constraints: Constraints::default(),
235            column_defaults: HashMap::new(),
236            expr_adapter_factory: config.expr_adapter_factory,
237        };
238
239        Ok(table)
240    }
241
242    /// Assign constraints
243    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
244        self.constraints = constraints;
245        self
246    }
247
248    /// Assign column defaults
249    pub fn with_column_defaults(
250        mut self,
251        column_defaults: HashMap<String, Expr>,
252    ) -> Self {
253        self.column_defaults = column_defaults;
254        self
255    }
256
257    /// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
258    ///
259    /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
260    /// multiple times in the same session.
261    ///
262    pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
263        self.collected_statistics = cache;
264        self
265    }
266
267    /// Specify the SQL definition for this table, if any
268    pub fn with_definition(mut self, definition: Option<String>) -> Self {
269        self.definition = definition;
270        self
271    }
272
273    /// Get paths ref
274    pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
275        &self.table_paths
276    }
277
278    /// Get options ref
279    pub fn options(&self) -> &ListingOptions {
280        &self.options
281    }
282
283    /// Get the schema source
284    pub fn schema_source(&self) -> SchemaSource {
285        self.schema_source
286    }
287
288    /// Deprecated: Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
289    ///
290    /// `SchemaAdapterFactory` has been removed. Use [`ListingTableConfig::with_expr_adapter_factory`]
291    /// and `PhysicalExprAdapterFactory` instead. See `upgrading.md` for more details.
292    ///
293    /// This method is a no-op and returns `self` unchanged.
294    #[deprecated(
295        since = "52.0.0",
296        note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
297    )]
298    #[expect(deprecated)]
299    pub fn with_schema_adapter_factory(
300        self,
301        _schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
302    ) -> Self {
303        // No-op - just return self unchanged
304        self
305    }
306
307    /// Deprecated: Returns the [`SchemaAdapterFactory`] used by this [`ListingTable`].
308    ///
309    /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
310    /// See `upgrading.md` for more details.
311    ///
312    /// Always returns `None`.
313    #[deprecated(
314        since = "52.0.0",
315        note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
316    )]
317    #[expect(deprecated)]
318    pub fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
319        None
320    }
321
322    /// Creates a file source for this table
323    fn create_file_source(&self) -> Arc<dyn FileSource> {
324        let table_schema = TableSchema::new(
325            Arc::clone(&self.file_schema),
326            self.options
327                .table_partition_cols
328                .iter()
329                .map(|(col, field)| Arc::new(Field::new(col, field.clone(), false)))
330                .collect(),
331        );
332
333        self.options.format.file_source(table_schema)
334    }
335
336    /// Creates output ordering from user-specified file_sort_order or derives
337    /// from file orderings when user doesn't specify.
338    ///
339    /// If user specified `file_sort_order`, that takes precedence.
340    /// Otherwise, attempts to derive common ordering from file orderings in
341    /// the provided file groups.
342    pub fn try_create_output_ordering(
343        &self,
344        execution_props: &ExecutionProps,
345        file_groups: &[FileGroup],
346    ) -> datafusion_common::Result<Vec<LexOrdering>> {
347        // If user specified sort order, use that
348        if !self.options.file_sort_order.is_empty() {
349            return create_lex_ordering(
350                &self.table_schema,
351                &self.options.file_sort_order,
352                execution_props,
353            );
354        }
355        if let Some(ordering) = derive_common_ordering_from_files(file_groups) {
356            return Ok(vec![ordering]);
357        }
358        Ok(vec![])
359    }
360}
361
362/// Derives a common ordering from file orderings across all file groups.
363///
364/// Returns the common ordering if all files have compatible orderings,
365/// otherwise returns None.
366///
367/// The function finds the longest common prefix among all file orderings.
368/// For example, if files have orderings `[a, b, c]` and `[a, b]`, the common
369/// ordering is `[a, b]`.
370fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option<LexOrdering> {
371    enum CurrentOrderingState {
372        /// Initial state before processing any files
373        FirstFile,
374        /// Some common ordering found so far
375        SomeOrdering(LexOrdering),
376        /// No files have ordering
377        NoOrdering,
378    }
379    let mut state = CurrentOrderingState::FirstFile;
380
381    // Collect file orderings and track counts
382    for group in file_groups {
383        for file in group.iter() {
384            state = match (&state, &file.ordering) {
385                // If this is the first file with ordering, set it as current
386                (CurrentOrderingState::FirstFile, Some(ordering)) => {
387                    CurrentOrderingState::SomeOrdering(ordering.clone())
388                }
389                (CurrentOrderingState::FirstFile, None) => {
390                    CurrentOrderingState::NoOrdering
391                }
392                // If we have an existing ordering, find common prefix with new ordering
393                (CurrentOrderingState::SomeOrdering(current), Some(ordering)) => {
394                    // Find common prefix between current and new ordering
395                    let prefix_len = current
396                        .as_ref()
397                        .iter()
398                        .zip(ordering.as_ref().iter())
399                        .take_while(|(a, b)| a == b)
400                        .count();
401                    if prefix_len == 0 {
402                        log::trace!(
403                            "Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}"
404                        );
405                        return None;
406                    } else {
407                        let ordering =
408                            LexOrdering::new(current.as_ref()[..prefix_len].to_vec())
409                                .expect("prefix_len > 0, so ordering must be valid");
410                        CurrentOrderingState::SomeOrdering(ordering)
411                    }
412                }
413                // If one file has ordering and another doesn't, no common ordering
414                // Return None and log a trace message explaining why
415                (CurrentOrderingState::SomeOrdering(ordering), None)
416                | (CurrentOrderingState::NoOrdering, Some(ordering)) => {
417                    log::trace!(
418                        "Cannot derive common ordering: some files have ordering {ordering:?}, others don't"
419                    );
420                    return None;
421                }
422                // Both have no ordering, remain in NoOrdering state
423                (CurrentOrderingState::NoOrdering, None) => {
424                    CurrentOrderingState::NoOrdering
425                }
426            };
427        }
428    }
429
430    match state {
431        CurrentOrderingState::SomeOrdering(ordering) => Some(ordering),
432        _ => None,
433    }
434}
435
436// Expressions can be used for partition pruning if they can be evaluated using
437// only the partition columns and there are partition columns.
438fn can_be_evaluated_for_partition_pruning(
439    partition_column_names: &[&str],
440    expr: &Expr,
441) -> bool {
442    !partition_column_names.is_empty()
443        && expr_applicable_for_cols(partition_column_names, expr)
444}
445
446#[async_trait]
447impl TableProvider for ListingTable {
448    fn schema(&self) -> SchemaRef {
449        Arc::clone(&self.table_schema)
450    }
451
452    fn constraints(&self) -> Option<&Constraints> {
453        Some(&self.constraints)
454    }
455
456    fn table_type(&self) -> TableType {
457        TableType::Base
458    }
459
460    async fn scan(
461        &self,
462        state: &dyn Session,
463        projection: Option<&Vec<usize>>,
464        filters: &[Expr],
465        limit: Option<usize>,
466    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
467        let options = ScanArgs::default()
468            .with_projection(projection.map(|p| p.as_slice()))
469            .with_filters(Some(filters))
470            .with_limit(limit);
471        Ok(self.scan_with_args(state, options).await?.into_inner())
472    }
473
474    async fn scan_with_args<'a>(
475        &self,
476        state: &dyn Session,
477        args: ScanArgs<'a>,
478    ) -> datafusion_common::Result<ScanResult> {
479        let projection = args.projection().map(|p| p.to_vec());
480        let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
481        let limit = args.limit();
482
483        // extract types of partition columns
484        let table_partition_cols = self
485            .options
486            .table_partition_cols
487            .iter()
488            .map(|col| Ok(Arc::new(self.table_schema.field_with_name(&col.0)?.clone())))
489            .collect::<datafusion_common::Result<Vec<_>>>()?;
490
491        let table_partition_col_names = table_partition_cols
492            .iter()
493            .map(|field| field.name().as_str())
494            .collect::<Vec<_>>();
495
496        // If the filters can be resolved using only partition cols, there is no need to
497        // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
498        let (partition_filters, filters): (Vec<_>, Vec<_>) =
499            filters.iter().cloned().partition(|filter| {
500                can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
501            });
502
503        // We should not limit the number of partitioned files to scan if there are filters and limit
504        // at the same time. This is because the limit should be applied after the filters are applied.
505        let statistic_file_limit = if filters.is_empty() { limit } else { None };
506
507        let ListFilesResult {
508            file_groups: mut partitioned_file_lists,
509            statistics,
510            grouped_by_partition: partitioned_by_file_group,
511        } = self
512            .list_files_for_scan(state, &partition_filters, statistic_file_limit)
513            .await?;
514
515        // if no files need to be read, return an `EmptyExec`
516        if partitioned_file_lists.is_empty() {
517            let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
518            return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
519        }
520
521        let output_ordering = self.try_create_output_ordering(
522            state.execution_props(),
523            &partitioned_file_lists,
524        )?;
525        match state
526            .config_options()
527            .execution
528            .split_file_groups_by_statistics
529            .then(|| {
530                output_ordering.first().map(|output_ordering| {
531                    FileScanConfig::split_groups_by_statistics_with_target_partitions(
532                        &self.table_schema,
533                        &partitioned_file_lists,
534                        output_ordering,
535                        self.options.target_partitions,
536                    )
537                })
538            })
539            .flatten()
540        {
541            Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
542            Some(Ok(new_groups)) => {
543                if new_groups.len() <= self.options.target_partitions {
544                    partitioned_file_lists = new_groups;
545                } else {
546                    log::debug!(
547                        "attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered"
548                    )
549                }
550            }
551            None => {} // no ordering required
552        };
553
554        let Some(object_store_url) =
555            self.table_paths.first().map(ListingTableUrl::object_store)
556        else {
557            return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
558                Schema::empty(),
559            )))));
560        };
561
562        let file_source = self.create_file_source();
563
564        // create the execution plan
565        let plan = self
566            .options
567            .format
568            .create_physical_plan(
569                state,
570                FileScanConfigBuilder::new(object_store_url, file_source)
571                    .with_file_groups(partitioned_file_lists)
572                    .with_constraints(self.constraints.clone())
573                    .with_statistics(statistics)
574                    .with_projection_indices(projection)?
575                    .with_limit(limit)
576                    .with_output_ordering(output_ordering)
577                    .with_expr_adapter(self.expr_adapter_factory.clone())
578                    .with_partitioned_by_file_group(partitioned_by_file_group)
579                    .build(),
580            )
581            .await?;
582
583        Ok(ScanResult::new(plan))
584    }
585
586    fn supports_filters_pushdown(
587        &self,
588        filters: &[&Expr],
589    ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
590        let partition_column_names = self
591            .options
592            .table_partition_cols
593            .iter()
594            .map(|col| col.0.as_str())
595            .collect::<Vec<_>>();
596        filters
597            .iter()
598            .map(|filter| {
599                if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
600                {
601                    // if filter can be handled by partition pruning, it is exact
602                    return Ok(TableProviderFilterPushDown::Exact);
603                }
604
605                Ok(TableProviderFilterPushDown::Inexact)
606            })
607            .collect()
608    }
609
610    fn get_table_definition(&self) -> Option<&str> {
611        self.definition.as_deref()
612    }
613
614    async fn insert_into(
615        &self,
616        state: &dyn Session,
617        input: Arc<dyn ExecutionPlan>,
618        insert_op: InsertOp,
619    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
620        // Check that the schema of the plan matches the schema of this table.
621        self.schema()
622            .logically_equivalent_names_and_types(&input.schema())?;
623
624        let table_path = &self.table_paths()[0];
625        if !table_path.is_collection() {
626            return plan_err!(
627                "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
628                To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
629            );
630        }
631
632        // Get the object store for the table path.
633        let store = state.runtime_env().object_store(table_path)?;
634
635        let file_list_stream = pruned_partition_list(
636            state,
637            store.as_ref(),
638            table_path,
639            &[],
640            &self.options.file_extension,
641            &self.options.table_partition_cols,
642        )
643        .await?;
644
645        let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
646        let keep_partition_by_columns =
647            state.config_options().execution.keep_partition_by_columns;
648
649        // Invalidate cache entries for this table if they exist
650        if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
651            let key = TableScopedPath {
652                table: table_path.get_table_ref().clone(),
653                path: table_path.prefix().clone(),
654            };
655            let _ = lfc.remove(&key);
656        }
657
658        // Sink related option, apart from format
659        let config = FileSinkConfig {
660            original_url: String::default(),
661            object_store_url: self.table_paths()[0].object_store(),
662            table_paths: self.table_paths().clone(),
663            file_group,
664            output_schema: self.schema(),
665            table_partition_cols: self.options.table_partition_cols.clone(),
666            insert_op,
667            keep_partition_by_columns,
668            file_extension: self.options().format.get_ext(),
669            file_output_mode: FileOutputMode::Automatic,
670        };
671
672        // For writes, we only use user-specified ordering (no file groups to derive from)
673        let orderings = self.try_create_output_ordering(state.execution_props(), &[])?;
674        // It is sufficient to pass only one of the equivalent orderings:
675        let order_requirements = orderings.into_iter().next().map(Into::into);
676
677        self.options()
678            .format
679            .create_writer_physical_plan(input, state, config, order_requirements)
680            .await
681    }
682
683    fn get_column_default(&self, column: &str) -> Option<&Expr> {
684        self.column_defaults.get(column)
685    }
686}
687
688impl ListingTable {
689    /// Get the list of files for a scan as well as the file level statistics.
690    /// The list is grouped to let the execution plan know how the files should
691    /// be distributed to different threads / executors.
692    pub async fn list_files_for_scan<'a>(
693        &'a self,
694        ctx: &'a dyn Session,
695        filters: &'a [Expr],
696        limit: Option<usize>,
697    ) -> datafusion_common::Result<ListFilesResult> {
698        let store = if let Some(url) = self.table_paths.first() {
699            ctx.runtime_env().object_store(url)?
700        } else {
701            return Ok(ListFilesResult {
702                file_groups: vec![],
703                statistics: Statistics::new_unknown(&self.file_schema),
704                grouped_by_partition: false,
705            });
706        };
707        // list files (with partitions)
708        let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
709            pruned_partition_list(
710                ctx,
711                store.as_ref(),
712                table_path,
713                filters,
714                &self.options.file_extension,
715                &self.options.table_partition_cols,
716            )
717        }))
718        .await?;
719        let meta_fetch_concurrency =
720            ctx.config_options().execution.meta_fetch_concurrency;
721        let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
722        // collect the statistics and ordering if required by the config
723        let files = file_list
724            .map(|part_file| async {
725                let part_file = part_file?;
726                let (statistics, ordering) = if self.options.collect_stat {
727                    self.do_collect_statistics_and_ordering(ctx, &store, &part_file)
728                        .await?
729                } else {
730                    (Arc::new(Statistics::new_unknown(&self.file_schema)), None)
731                };
732                Ok(part_file
733                    .with_statistics(statistics)
734                    .with_ordering(ordering))
735            })
736            .boxed()
737            .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
738
739        let (file_group, inexact_stats) =
740            get_files_with_limit(files, limit, self.options.collect_stat).await?;
741
742        // Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N
743        //
744        // When enabled, files are grouped by their Hive partition column values, allowing
745        // FileScanConfig to declare Hash partitioning. This enables the optimizer to skip
746        // hash repartitioning for aggregates and joins on partition columns.
747        let threshold = ctx.config_options().optimizer.preserve_file_partitions;
748
749        let (file_groups, grouped_by_partition) = if threshold > 0
750            && !self.options.table_partition_cols.is_empty()
751        {
752            let grouped =
753                file_group.group_by_partition_values(self.options.target_partitions);
754            if grouped.len() >= threshold {
755                (grouped, true)
756            } else {
757                let all_files: Vec<_> =
758                    grouped.into_iter().flat_map(|g| g.into_inner()).collect();
759                (
760                    FileGroup::new(all_files).split_files(self.options.target_partitions),
761                    false,
762                )
763            }
764        } else {
765            (
766                file_group.split_files(self.options.target_partitions),
767                false,
768            )
769        };
770
771        let (file_groups, stats) = compute_all_files_statistics(
772            file_groups,
773            self.schema(),
774            self.options.collect_stat,
775            inexact_stats,
776        )?;
777
778        // Note: Statistics already include both file columns and partition columns.
779        // PartitionedFile::with_statistics automatically appends exact partition column
780        // statistics (min=max=partition_value, null_count=0, distinct_count=1) computed
781        // from partition_values.
782        Ok(ListFilesResult {
783            file_groups,
784            statistics: stats,
785            grouped_by_partition,
786        })
787    }
788
789    /// Collects statistics and ordering for a given partitioned file.
790    ///
791    /// This method checks if statistics are cached. If cached, it returns the
792    /// cached statistics and infers ordering separately. If not cached, it infers
793    /// both statistics and ordering in a single metadata read for efficiency.
794    async fn do_collect_statistics_and_ordering(
795        &self,
796        ctx: &dyn Session,
797        store: &Arc<dyn ObjectStore>,
798        part_file: &PartitionedFile,
799    ) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
800        use datafusion_execution::cache::cache_manager::CachedFileMetadata;
801
802        let path = TableScopedPath {
803            table: part_file.table_reference.clone(),
804            path: part_file.object_meta.location.clone(),
805        };
806        let meta = &part_file.object_meta;
807
808        // Check cache first - if we have valid cached statistics and ordering
809        if let Some(cache) = &self.collected_statistics
810            && let Some(cached) = cache.get(&path)
811            && cached.is_valid_for(meta)
812        {
813            // Return cached statistics and ordering
814            return Ok((Arc::clone(&cached.statistics), cached.ordering.clone()));
815        }
816
817        // Cache miss or invalid: fetch both statistics and ordering in a single metadata read
818        let file_meta = self
819            .options
820            .format
821            .infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
822            .await?;
823
824        let statistics = Arc::new(file_meta.statistics);
825
826        // Store in cache
827        if let Some(cache) = &self.collected_statistics {
828            cache.put(
829                &path,
830                CachedFileMetadata::new(
831                    meta.clone(),
832                    Arc::clone(&statistics),
833                    file_meta.ordering.clone(),
834                ),
835            );
836        }
837
838        Ok((statistics, file_meta.ordering))
839    }
840}
841
842/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
843///
844/// This function collects files from the provided stream until either:
845/// 1. The stream is exhausted
846/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
847///
848/// # Arguments
849/// * `files` - A stream of `Result<PartitionedFile>` items to process
850/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
851///   once the accumulated number of rows exceeds this limit
852/// * `collect_stats` - Whether to collect and accumulate statistics from the files
853///
854/// # Returns
855/// A `Result` containing a `FileGroup` with the collected files
856/// and a boolean indicating whether the statistics are inexact.
857///
858/// # Note
859/// The function will continue processing files if statistics are not available or if the
860/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
861/// but files will still be collected.
862async fn get_files_with_limit(
863    files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
864    limit: Option<usize>,
865    collect_stats: bool,
866) -> datafusion_common::Result<(FileGroup, bool)> {
867    let mut file_group = FileGroup::default();
868    // Fusing the stream allows us to call next safely even once it is finished.
869    let mut all_files = Box::pin(files.fuse());
870    enum ProcessingState {
871        ReadingFiles,
872        ReachedLimit,
873    }
874
875    let mut state = ProcessingState::ReadingFiles;
876    let mut num_rows = Precision::Absent;
877
878    while let Some(file_result) = all_files.next().await {
879        // Early exit if we've already reached our limit
880        if matches!(state, ProcessingState::ReachedLimit) {
881            break;
882        }
883
884        let file = file_result?;
885
886        // Update file statistics regardless of state
887        if collect_stats && let Some(file_stats) = &file.statistics {
888            num_rows = if file_group.is_empty() {
889                // For the first file, just take its row count
890                file_stats.num_rows
891            } else {
892                // For subsequent files, accumulate the counts
893                num_rows.add(&file_stats.num_rows)
894            };
895        }
896
897        // Always add the file to our group
898        file_group.push(file);
899
900        // Check if we've hit the limit (if one was specified)
901        if let Some(limit) = limit
902            && let Precision::Exact(row_count) = num_rows
903            && row_count > limit
904        {
905            state = ProcessingState::ReachedLimit;
906        }
907    }
908    // If we still have files in the stream, it means that the limit kicked
909    // in, and the statistic could have been different had we processed the
910    // files in a different order.
911    let inexact_stats = all_files.next().await.is_some();
912    Ok((file_group, inexact_stats))
913}
914
915#[cfg(test)]
916mod tests {
917    use super::*;
918    use arrow::compute::SortOptions;
919    use datafusion_physical_expr::expressions::Column;
920    use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
921
922    /// Helper to create a PhysicalSortExpr
923    fn sort_expr(
924        name: &str,
925        idx: usize,
926        descending: bool,
927        nulls_first: bool,
928    ) -> PhysicalSortExpr {
929        PhysicalSortExpr::new(
930            Arc::new(Column::new(name, idx)),
931            SortOptions {
932                descending,
933                nulls_first,
934            },
935        )
936    }
937
938    /// Helper to create a LexOrdering (unwraps the Option)
939    fn lex_ordering(exprs: Vec<PhysicalSortExpr>) -> LexOrdering {
940        LexOrdering::new(exprs).expect("expected non-empty ordering")
941    }
942
943    /// Helper to create a PartitionedFile with optional ordering
944    fn create_file(name: &str, ordering: Option<LexOrdering>) -> PartitionedFile {
945        PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering)
946    }
947
948    #[test]
949    fn test_derive_common_ordering_all_files_same_ordering() {
950        // All files have the same ordering -> returns that ordering
951        let ordering = lex_ordering(vec![
952            sort_expr("a", 0, false, true),
953            sort_expr("b", 1, true, false),
954        ]);
955
956        let file_groups = vec![
957            FileGroup::new(vec![
958                create_file("f1.parquet", Some(ordering.clone())),
959                create_file("f2.parquet", Some(ordering.clone())),
960            ]),
961            FileGroup::new(vec![create_file("f3.parquet", Some(ordering.clone()))]),
962        ];
963
964        let result = derive_common_ordering_from_files(&file_groups);
965        assert_eq!(result, Some(ordering));
966    }
967
968    #[test]
969    fn test_derive_common_ordering_common_prefix() {
970        // Files have different orderings but share a common prefix
971        let ordering_abc = lex_ordering(vec![
972            sort_expr("a", 0, false, true),
973            sort_expr("b", 1, false, true),
974            sort_expr("c", 2, false, true),
975        ]);
976        let ordering_ab = lex_ordering(vec![
977            sort_expr("a", 0, false, true),
978            sort_expr("b", 1, false, true),
979        ]);
980
981        let file_groups = vec![FileGroup::new(vec![
982            create_file("f1.parquet", Some(ordering_abc)),
983            create_file("f2.parquet", Some(ordering_ab.clone())),
984        ])];
985
986        let result = derive_common_ordering_from_files(&file_groups);
987        assert_eq!(result, Some(ordering_ab));
988    }
989
990    #[test]
991    fn test_derive_common_ordering_no_common_prefix() {
992        // Files have completely different orderings -> returns None
993        let ordering_a = lex_ordering(vec![sort_expr("a", 0, false, true)]);
994        let ordering_b = lex_ordering(vec![sort_expr("b", 1, false, true)]);
995
996        let file_groups = vec![FileGroup::new(vec![
997            create_file("f1.parquet", Some(ordering_a)),
998            create_file("f2.parquet", Some(ordering_b)),
999        ])];
1000
1001        let result = derive_common_ordering_from_files(&file_groups);
1002        assert_eq!(result, None);
1003    }
1004
1005    #[test]
1006    fn test_derive_common_ordering_mixed_with_none() {
1007        // Some files have ordering, some don't -> returns None
1008        let ordering = lex_ordering(vec![sort_expr("a", 0, false, true)]);
1009
1010        let file_groups = vec![FileGroup::new(vec![
1011            create_file("f1.parquet", Some(ordering)),
1012            create_file("f2.parquet", None),
1013        ])];
1014
1015        let result = derive_common_ordering_from_files(&file_groups);
1016        assert_eq!(result, None);
1017    }
1018
1019    #[test]
1020    fn test_derive_common_ordering_all_none() {
1021        // No files have ordering -> returns None
1022        let file_groups = vec![FileGroup::new(vec![
1023            create_file("f1.parquet", None),
1024            create_file("f2.parquet", None),
1025        ])];
1026
1027        let result = derive_common_ordering_from_files(&file_groups);
1028        assert_eq!(result, None);
1029    }
1030
1031    #[test]
1032    fn test_derive_common_ordering_empty_groups() {
1033        // Empty file groups -> returns None
1034        let file_groups: Vec<FileGroup> = vec![];
1035        let result = derive_common_ordering_from_files(&file_groups);
1036        assert_eq!(result, None);
1037    }
1038
1039    #[test]
1040    fn test_derive_common_ordering_single_file() {
1041        // Single file with ordering -> returns that ordering
1042        let ordering = lex_ordering(vec![
1043            sort_expr("a", 0, false, true),
1044            sort_expr("b", 1, true, false),
1045        ]);
1046
1047        let file_groups = vec![FileGroup::new(vec![create_file(
1048            "f1.parquet",
1049            Some(ordering.clone()),
1050        )])];
1051
1052        let result = derive_common_ordering_from_files(&file_groups);
1053        assert_eq!(result, Some(ordering));
1054    }
1055}