Skip to main content

datafusion_catalog_listing/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::config::SchemaSource;
19use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
20use crate::{ListingOptions, ListingTableConfig};
21use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
22use async_trait::async_trait;
23use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
24use datafusion_common::stats::Precision;
25use datafusion_common::{
26    Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
27};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig};
32#[expect(deprecated)]
33use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
34use datafusion_datasource::{
35    ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
36};
37use datafusion_execution::cache::TableScopedPath;
38use datafusion_execution::cache::cache_manager::FileStatisticsCache;
39use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
40use datafusion_expr::dml::InsertOp;
41use datafusion_expr::execution_props::ExecutionProps;
42use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
43use datafusion_physical_expr::create_lex_ordering;
44use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
45use datafusion_physical_expr_common::sort_expr::LexOrdering;
46use datafusion_physical_plan::ExecutionPlan;
47use datafusion_physical_plan::empty::EmptyExec;
48use futures::{Stream, StreamExt, TryStreamExt, future, stream};
49use object_store::ObjectStore;
50use std::any::Any;
51use std::collections::HashMap;
52use std::sync::Arc;
53
54/// Result of a file listing operation from [`ListingTable::list_files_for_scan`].
55#[derive(Debug)]
56pub struct ListFilesResult {
57    /// File groups organized by the partitioning strategy.
58    pub file_groups: Vec<FileGroup>,
59    /// Aggregated statistics for all files.
60    pub statistics: Statistics,
61    /// Whether files are grouped by partition values (enables Hash partitioning).
62    pub grouped_by_partition: bool,
63}
64
65/// Built in [`TableProvider`] that reads data from one or more files as a single table.
66///
67/// The files are read using an  [`ObjectStore`] instance, for example from
68/// local files or objects from AWS S3.
69///
70/// # Features:
71/// * Reading multiple files as a single table
72/// * Hive style partitioning (e.g., directories named `date=2024-06-01`)
73/// * Merges schemas from files with compatible but not identical schemas (see [`ListingTableConfig::file_schema`])
74/// * `limit`, `filter` and `projection` pushdown for formats that support it (e.g.,
75///   Parquet)
76/// * Statistics collection and pruning based on file metadata
77/// * Pre-existing sort order (see [`ListingOptions::file_sort_order`])
78/// * Metadata caching to speed up repeated queries (see [`FileMetadataCache`])
79/// * Statistics caching (see [`FileStatisticsCache`])
80///
81/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache
82///
83/// # Reading Directories and Hive Style Partitioning
84///
85/// For example, given the `table1` directory (or object store prefix)
86///
87/// ```text
88/// table1
89///  ├── file1.parquet
90///  └── file2.parquet
91/// ```
92///
93/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as
94/// a single table, merging the schemas if the files have compatible but not
95/// identical schemas.
96///
97/// Given the `table2` directory (or object store prefix)
98///
99/// ```text
100/// table2
101///  ├── date=2024-06-01
102///  │    ├── file3.parquet
103///  │    └── file4.parquet
104///  └── date=2024-06-02
105///       └── file5.parquet
106/// ```
107///
108/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and
109/// `file5.parquet` as a single table, again merging schemas if necessary.
110///
111/// Given the hive style partitioning structure (e.g,. directories named
112/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date`
113/// column when reading the table:
114/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01`
115/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`.
116///
117/// If the query has a predicate like `WHERE date = '2024-06-01'`
118/// only the corresponding directory will be read.
119///
120/// # See Also
121///
122/// 1. [`ListingTableConfig`]: Configuration options
123/// 1. [`DataSourceExec`]: `ExecutionPlan` used by `ListingTable`
124///
125/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
126///
127/// # Caching Metadata
128///
129/// Some formats, such as Parquet, use the `FileMetadataCache` to cache file
130/// metadata that is needed to execute but expensive to read, such as row
131/// groups and statistics. The cache is scoped to the `SessionContext` and can
132/// be configured via the [runtime config options].
133///
134/// [runtime config options]: https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings
135///
136/// # Example: Read a directory of parquet files using a [`ListingTable`]
137///
138/// ```no_run
139/// # use datafusion_common::Result;
140/// # use std::sync::Arc;
141/// # use datafusion_catalog::TableProvider;
142/// # use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
143/// # use datafusion_datasource::ListingTableUrl;
144/// # use datafusion_datasource_parquet::file_format::ParquetFormat;/// #
145/// # use datafusion_catalog::Session;
146/// async fn get_listing_table(session: &dyn Session) -> Result<Arc<dyn TableProvider>> {
147/// let table_path = "/path/to/parquet";
148///
149/// // Parse the path
150/// let table_path = ListingTableUrl::parse(table_path)?;
151///
152/// // Create default parquet options
153/// let file_format = ParquetFormat::new();
154/// let listing_options = ListingOptions::new(Arc::new(file_format))
155///   .with_file_extension(".parquet");
156///
157/// // Resolve the schema
158/// let resolved_schema = listing_options
159///    .infer_schema(session, &table_path)
160///    .await?;
161///
162/// let config = ListingTableConfig::new(table_path)
163///   .with_listing_options(listing_options)
164///   .with_schema(resolved_schema);
165///
166/// // Create a new TableProvider
167/// let provider = Arc::new(ListingTable::try_new(config)?);
168///
169/// # Ok(provider)
170/// # }
171/// ```
172#[derive(Debug, Clone)]
173pub struct ListingTable {
174    table_paths: Vec<ListingTableUrl>,
175    /// `file_schema` contains only the columns physically stored in the data files themselves.
176    ///     - Represents the actual fields found in files like Parquet, CSV, etc.
177    ///     - Used when reading the raw data from files
178    file_schema: SchemaRef,
179    /// `table_schema` combines `file_schema` + partition columns
180    ///     - Partition columns are derived from directory paths (not stored in files)
181    ///     - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet`
182    table_schema: SchemaRef,
183    /// Indicates how the schema was derived (inferred or explicitly specified)
184    schema_source: SchemaSource,
185    /// Options used to configure the listing table such as the file format
186    /// and partitioning information
187    options: ListingOptions,
188    /// The SQL definition for this table, if any
189    definition: Option<String>,
190    /// Cache for collected file statistics
191    collected_statistics: Arc<dyn FileStatisticsCache>,
192    /// Constraints applied to this table
193    constraints: Constraints,
194    /// Column default expressions for columns that are not physically present in the data files
195    column_defaults: HashMap<String, Expr>,
196    /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
197    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
198}
199
200impl ListingTable {
201    /// Create new [`ListingTable`]
202    ///
203    /// See documentation and example on [`ListingTable`] and [`ListingTableConfig`]
204    pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result<Self> {
205        // Extract schema_source before moving other parts of the config
206        let schema_source = config.schema_source();
207
208        let file_schema = config
209            .file_schema
210            .ok_or_else(|| internal_datafusion_err!("No schema provided."))?;
211
212        let options = config
213            .options
214            .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;
215
216        // Add the partition columns to the file schema
217        let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
218        for (part_col_name, part_col_type) in &options.table_partition_cols {
219            builder.push(Field::new(part_col_name, part_col_type.clone(), false));
220        }
221
222        let table_schema = Arc::new(
223            builder
224                .finish()
225                .with_metadata(file_schema.metadata().clone()),
226        );
227
228        let table = Self {
229            table_paths: config.table_paths,
230            file_schema,
231            table_schema,
232            schema_source,
233            options,
234            definition: None,
235            collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
236            constraints: Constraints::default(),
237            column_defaults: HashMap::new(),
238            expr_adapter_factory: config.expr_adapter_factory,
239        };
240
241        Ok(table)
242    }
243
244    /// Assign constraints
245    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
246        self.constraints = constraints;
247        self
248    }
249
250    /// Assign column defaults
251    pub fn with_column_defaults(
252        mut self,
253        column_defaults: HashMap<String, Expr>,
254    ) -> Self {
255        self.column_defaults = column_defaults;
256        self
257    }
258
259    /// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
260    ///
261    /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
262    /// multiple times in the same session.
263    ///
264    /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
265    pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
266        self.collected_statistics =
267            cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
268        self
269    }
270
271    /// Specify the SQL definition for this table, if any
272    pub fn with_definition(mut self, definition: Option<String>) -> Self {
273        self.definition = definition;
274        self
275    }
276
277    /// Get paths ref
278    pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
279        &self.table_paths
280    }
281
282    /// Get options ref
283    pub fn options(&self) -> &ListingOptions {
284        &self.options
285    }
286
287    /// Get the schema source
288    pub fn schema_source(&self) -> SchemaSource {
289        self.schema_source
290    }
291
292    /// Deprecated: Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
293    ///
294    /// `SchemaAdapterFactory` has been removed. Use [`ListingTableConfig::with_expr_adapter_factory`]
295    /// and `PhysicalExprAdapterFactory` instead. See `upgrading.md` for more details.
296    ///
297    /// This method is a no-op and returns `self` unchanged.
298    #[deprecated(
299        since = "52.0.0",
300        note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
301    )]
302    #[expect(deprecated)]
303    pub fn with_schema_adapter_factory(
304        self,
305        _schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
306    ) -> Self {
307        // No-op - just return self unchanged
308        self
309    }
310
311    /// Deprecated: Returns the [`SchemaAdapterFactory`] used by this [`ListingTable`].
312    ///
313    /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
314    /// See `upgrading.md` for more details.
315    ///
316    /// Always returns `None`.
317    #[deprecated(
318        since = "52.0.0",
319        note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
320    )]
321    #[expect(deprecated)]
322    pub fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
323        None
324    }
325
326    /// Creates a file source for this table
327    fn create_file_source(&self) -> Arc<dyn FileSource> {
328        let table_schema = TableSchema::new(
329            Arc::clone(&self.file_schema),
330            self.options
331                .table_partition_cols
332                .iter()
333                .map(|(col, field)| Arc::new(Field::new(col, field.clone(), false)))
334                .collect(),
335        );
336
337        self.options.format.file_source(table_schema)
338    }
339
340    /// Creates output ordering from user-specified file_sort_order or derives
341    /// from file orderings when user doesn't specify.
342    ///
343    /// If user specified `file_sort_order`, that takes precedence.
344    /// Otherwise, attempts to derive common ordering from file orderings in
345    /// the provided file groups.
346    pub fn try_create_output_ordering(
347        &self,
348        execution_props: &ExecutionProps,
349        file_groups: &[FileGroup],
350    ) -> datafusion_common::Result<Vec<LexOrdering>> {
351        // If user specified sort order, use that
352        if !self.options.file_sort_order.is_empty() {
353            return create_lex_ordering(
354                &self.table_schema,
355                &self.options.file_sort_order,
356                execution_props,
357            );
358        }
359        if let Some(ordering) = derive_common_ordering_from_files(file_groups) {
360            return Ok(vec![ordering]);
361        }
362        Ok(vec![])
363    }
364}
365
366/// Derives a common ordering from file orderings across all file groups.
367///
368/// Returns the common ordering if all files have compatible orderings,
369/// otherwise returns None.
370///
371/// The function finds the longest common prefix among all file orderings.
372/// For example, if files have orderings `[a, b, c]` and `[a, b]`, the common
373/// ordering is `[a, b]`.
374fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option<LexOrdering> {
375    enum CurrentOrderingState {
376        /// Initial state before processing any files
377        FirstFile,
378        /// Some common ordering found so far
379        SomeOrdering(LexOrdering),
380        /// No files have ordering
381        NoOrdering,
382    }
383    let mut state = CurrentOrderingState::FirstFile;
384
385    // Collect file orderings and track counts
386    for group in file_groups {
387        for file in group.iter() {
388            state = match (&state, &file.ordering) {
389                // If this is the first file with ordering, set it as current
390                (CurrentOrderingState::FirstFile, Some(ordering)) => {
391                    CurrentOrderingState::SomeOrdering(ordering.clone())
392                }
393                (CurrentOrderingState::FirstFile, None) => {
394                    CurrentOrderingState::NoOrdering
395                }
396                // If we have an existing ordering, find common prefix with new ordering
397                (CurrentOrderingState::SomeOrdering(current), Some(ordering)) => {
398                    // Find common prefix between current and new ordering
399                    let prefix_len = current
400                        .as_ref()
401                        .iter()
402                        .zip(ordering.as_ref().iter())
403                        .take_while(|(a, b)| a == b)
404                        .count();
405                    if prefix_len == 0 {
406                        log::trace!(
407                            "Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}"
408                        );
409                        return None;
410                    } else {
411                        let ordering =
412                            LexOrdering::new(current.as_ref()[..prefix_len].to_vec())
413                                .expect("prefix_len > 0, so ordering must be valid");
414                        CurrentOrderingState::SomeOrdering(ordering)
415                    }
416                }
417                // If one file has ordering and another doesn't, no common ordering
418                // Return None and log a trace message explaining why
419                (CurrentOrderingState::SomeOrdering(ordering), None)
420                | (CurrentOrderingState::NoOrdering, Some(ordering)) => {
421                    log::trace!(
422                        "Cannot derive common ordering: some files have ordering {ordering:?}, others don't"
423                    );
424                    return None;
425                }
426                // Both have no ordering, remain in NoOrdering state
427                (CurrentOrderingState::NoOrdering, None) => {
428                    CurrentOrderingState::NoOrdering
429                }
430            };
431        }
432    }
433
434    match state {
435        CurrentOrderingState::SomeOrdering(ordering) => Some(ordering),
436        _ => None,
437    }
438}
439
440// Expressions can be used for partition pruning if they can be evaluated using
441// only the partition columns and there are partition columns.
442fn can_be_evaluated_for_partition_pruning(
443    partition_column_names: &[&str],
444    expr: &Expr,
445) -> bool {
446    !partition_column_names.is_empty()
447        && expr_applicable_for_cols(partition_column_names, expr)
448}
449
450#[async_trait]
451impl TableProvider for ListingTable {
452    fn as_any(&self) -> &dyn Any {
453        self
454    }
455
456    fn schema(&self) -> SchemaRef {
457        Arc::clone(&self.table_schema)
458    }
459
460    fn constraints(&self) -> Option<&Constraints> {
461        Some(&self.constraints)
462    }
463
464    fn table_type(&self) -> TableType {
465        TableType::Base
466    }
467
468    async fn scan(
469        &self,
470        state: &dyn Session,
471        projection: Option<&Vec<usize>>,
472        filters: &[Expr],
473        limit: Option<usize>,
474    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
475        let options = ScanArgs::default()
476            .with_projection(projection.map(|p| p.as_slice()))
477            .with_filters(Some(filters))
478            .with_limit(limit);
479        Ok(self.scan_with_args(state, options).await?.into_inner())
480    }
481
482    async fn scan_with_args<'a>(
483        &self,
484        state: &dyn Session,
485        args: ScanArgs<'a>,
486    ) -> datafusion_common::Result<ScanResult> {
487        let projection = args.projection().map(|p| p.to_vec());
488        let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
489        let limit = args.limit();
490
491        // extract types of partition columns
492        let table_partition_cols = self
493            .options
494            .table_partition_cols
495            .iter()
496            .map(|col| Ok(Arc::new(self.table_schema.field_with_name(&col.0)?.clone())))
497            .collect::<datafusion_common::Result<Vec<_>>>()?;
498
499        let table_partition_col_names = table_partition_cols
500            .iter()
501            .map(|field| field.name().as_str())
502            .collect::<Vec<_>>();
503
504        // If the filters can be resolved using only partition cols, there is no need to
505        // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
506        let (partition_filters, filters): (Vec<_>, Vec<_>) =
507            filters.iter().cloned().partition(|filter| {
508                can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
509            });
510
511        // We should not limit the number of partitioned files to scan if there are filters and limit
512        // at the same time. This is because the limit should be applied after the filters are applied.
513        let statistic_file_limit = if filters.is_empty() { limit } else { None };
514
515        let ListFilesResult {
516            file_groups: mut partitioned_file_lists,
517            statistics,
518            grouped_by_partition: partitioned_by_file_group,
519        } = self
520            .list_files_for_scan(state, &partition_filters, statistic_file_limit)
521            .await?;
522
523        // if no files need to be read, return an `EmptyExec`
524        if partitioned_file_lists.is_empty() {
525            let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
526            return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
527        }
528
529        let output_ordering = self.try_create_output_ordering(
530            state.execution_props(),
531            &partitioned_file_lists,
532        )?;
533        match state
534            .config_options()
535            .execution
536            .split_file_groups_by_statistics
537            .then(|| {
538                output_ordering.first().map(|output_ordering| {
539                    FileScanConfig::split_groups_by_statistics_with_target_partitions(
540                        &self.table_schema,
541                        &partitioned_file_lists,
542                        output_ordering,
543                        self.options.target_partitions,
544                    )
545                })
546            })
547            .flatten()
548        {
549            Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
550            Some(Ok(new_groups)) => {
551                if new_groups.len() <= self.options.target_partitions {
552                    partitioned_file_lists = new_groups;
553                } else {
554                    log::debug!(
555                        "attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered"
556                    )
557                }
558            }
559            None => {} // no ordering required
560        };
561
562        let Some(object_store_url) =
563            self.table_paths.first().map(ListingTableUrl::object_store)
564        else {
565            return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
566                Schema::empty(),
567            )))));
568        };
569
570        let file_source = self.create_file_source();
571
572        // create the execution plan
573        let plan = self
574            .options
575            .format
576            .create_physical_plan(
577                state,
578                FileScanConfigBuilder::new(object_store_url, file_source)
579                    .with_file_groups(partitioned_file_lists)
580                    .with_constraints(self.constraints.clone())
581                    .with_statistics(statistics)
582                    .with_projection_indices(projection)?
583                    .with_limit(limit)
584                    .with_output_ordering(output_ordering)
585                    .with_expr_adapter(self.expr_adapter_factory.clone())
586                    .with_partitioned_by_file_group(partitioned_by_file_group)
587                    .build(),
588            )
589            .await?;
590
591        Ok(ScanResult::new(plan))
592    }
593
594    fn supports_filters_pushdown(
595        &self,
596        filters: &[&Expr],
597    ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
598        let partition_column_names = self
599            .options
600            .table_partition_cols
601            .iter()
602            .map(|col| col.0.as_str())
603            .collect::<Vec<_>>();
604        filters
605            .iter()
606            .map(|filter| {
607                if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
608                {
609                    // if filter can be handled by partition pruning, it is exact
610                    return Ok(TableProviderFilterPushDown::Exact);
611                }
612
613                Ok(TableProviderFilterPushDown::Inexact)
614            })
615            .collect()
616    }
617
618    fn get_table_definition(&self) -> Option<&str> {
619        self.definition.as_deref()
620    }
621
622    async fn insert_into(
623        &self,
624        state: &dyn Session,
625        input: Arc<dyn ExecutionPlan>,
626        insert_op: InsertOp,
627    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
628        // Check that the schema of the plan matches the schema of this table.
629        self.schema()
630            .logically_equivalent_names_and_types(&input.schema())?;
631
632        let table_path = &self.table_paths()[0];
633        if !table_path.is_collection() {
634            return plan_err!(
635                "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
636                To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
637            );
638        }
639
640        // Get the object store for the table path.
641        let store = state.runtime_env().object_store(table_path)?;
642
643        let file_list_stream = pruned_partition_list(
644            state,
645            store.as_ref(),
646            table_path,
647            &[],
648            &self.options.file_extension,
649            &self.options.table_partition_cols,
650        )
651        .await?;
652
653        let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
654        let keep_partition_by_columns =
655            state.config_options().execution.keep_partition_by_columns;
656
657        // Invalidate cache entries for this table if they exist
658        if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
659            let key = TableScopedPath {
660                table: table_path.get_table_ref().clone(),
661                path: table_path.prefix().clone(),
662            };
663            let _ = lfc.remove(&key);
664        }
665
666        // Sink related option, apart from format
667        let config = FileSinkConfig {
668            original_url: String::default(),
669            object_store_url: self.table_paths()[0].object_store(),
670            table_paths: self.table_paths().clone(),
671            file_group,
672            output_schema: self.schema(),
673            table_partition_cols: self.options.table_partition_cols.clone(),
674            insert_op,
675            keep_partition_by_columns,
676            file_extension: self.options().format.get_ext(),
677            file_output_mode: FileOutputMode::Automatic,
678        };
679
680        // For writes, we only use user-specified ordering (no file groups to derive from)
681        let orderings = self.try_create_output_ordering(state.execution_props(), &[])?;
682        // It is sufficient to pass only one of the equivalent orderings:
683        let order_requirements = orderings.into_iter().next().map(Into::into);
684
685        self.options()
686            .format
687            .create_writer_physical_plan(input, state, config, order_requirements)
688            .await
689    }
690
691    fn get_column_default(&self, column: &str) -> Option<&Expr> {
692        self.column_defaults.get(column)
693    }
694}
695
696impl ListingTable {
697    /// Get the list of files for a scan as well as the file level statistics.
698    /// The list is grouped to let the execution plan know how the files should
699    /// be distributed to different threads / executors.
700    pub async fn list_files_for_scan<'a>(
701        &'a self,
702        ctx: &'a dyn Session,
703        filters: &'a [Expr],
704        limit: Option<usize>,
705    ) -> datafusion_common::Result<ListFilesResult> {
706        let store = if let Some(url) = self.table_paths.first() {
707            ctx.runtime_env().object_store(url)?
708        } else {
709            return Ok(ListFilesResult {
710                file_groups: vec![],
711                statistics: Statistics::new_unknown(&self.file_schema),
712                grouped_by_partition: false,
713            });
714        };
715        // list files (with partitions)
716        let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
717            pruned_partition_list(
718                ctx,
719                store.as_ref(),
720                table_path,
721                filters,
722                &self.options.file_extension,
723                &self.options.table_partition_cols,
724            )
725        }))
726        .await?;
727        let meta_fetch_concurrency =
728            ctx.config_options().execution.meta_fetch_concurrency;
729        let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
730        // collect the statistics and ordering if required by the config
731        let files = file_list
732            .map(|part_file| async {
733                let part_file = part_file?;
734                let (statistics, ordering) = if self.options.collect_stat {
735                    self.do_collect_statistics_and_ordering(ctx, &store, &part_file)
736                        .await?
737                } else {
738                    (Arc::new(Statistics::new_unknown(&self.file_schema)), None)
739                };
740                Ok(part_file
741                    .with_statistics(statistics)
742                    .with_ordering(ordering))
743            })
744            .boxed()
745            .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
746
747        let (file_group, inexact_stats) =
748            get_files_with_limit(files, limit, self.options.collect_stat).await?;
749
750        // Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N
751        //
752        // When enabled, files are grouped by their Hive partition column values, allowing
753        // FileScanConfig to declare Hash partitioning. This enables the optimizer to skip
754        // hash repartitioning for aggregates and joins on partition columns.
755        let threshold = ctx.config_options().optimizer.preserve_file_partitions;
756
757        let (file_groups, grouped_by_partition) = if threshold > 0
758            && !self.options.table_partition_cols.is_empty()
759        {
760            let grouped =
761                file_group.group_by_partition_values(self.options.target_partitions);
762            if grouped.len() >= threshold {
763                (grouped, true)
764            } else {
765                let all_files: Vec<_> =
766                    grouped.into_iter().flat_map(|g| g.into_inner()).collect();
767                (
768                    FileGroup::new(all_files).split_files(self.options.target_partitions),
769                    false,
770                )
771            }
772        } else {
773            (
774                file_group.split_files(self.options.target_partitions),
775                false,
776            )
777        };
778
779        let (file_groups, stats) = compute_all_files_statistics(
780            file_groups,
781            self.schema(),
782            self.options.collect_stat,
783            inexact_stats,
784        )?;
785
786        // Note: Statistics already include both file columns and partition columns.
787        // PartitionedFile::with_statistics automatically appends exact partition column
788        // statistics (min=max=partition_value, null_count=0, distinct_count=1) computed
789        // from partition_values.
790        Ok(ListFilesResult {
791            file_groups,
792            statistics: stats,
793            grouped_by_partition,
794        })
795    }
796
797    /// Collects statistics and ordering for a given partitioned file.
798    ///
799    /// This method checks if statistics are cached. If cached, it returns the
800    /// cached statistics and infers ordering separately. If not cached, it infers
801    /// both statistics and ordering in a single metadata read for efficiency.
802    async fn do_collect_statistics_and_ordering(
803        &self,
804        ctx: &dyn Session,
805        store: &Arc<dyn ObjectStore>,
806        part_file: &PartitionedFile,
807    ) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
808        use datafusion_execution::cache::cache_manager::CachedFileMetadata;
809
810        let path = &part_file.object_meta.location;
811        let meta = &part_file.object_meta;
812
813        // Check cache first - if we have valid cached statistics and ordering
814        if let Some(cached) = self.collected_statistics.get(path)
815            && cached.is_valid_for(meta)
816        {
817            // Return cached statistics and ordering
818            return Ok((Arc::clone(&cached.statistics), cached.ordering.clone()));
819        }
820
821        // Cache miss or invalid: fetch both statistics and ordering in a single metadata read
822        let file_meta = self
823            .options
824            .format
825            .infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
826            .await?;
827
828        let statistics = Arc::new(file_meta.statistics);
829
830        // Store in cache
831        self.collected_statistics.put(
832            path,
833            CachedFileMetadata::new(
834                meta.clone(),
835                Arc::clone(&statistics),
836                file_meta.ordering.clone(),
837            ),
838        );
839
840        Ok((statistics, file_meta.ordering))
841    }
842}
843
844/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
845///
846/// This function collects files from the provided stream until either:
847/// 1. The stream is exhausted
848/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
849///
850/// # Arguments
851/// * `files` - A stream of `Result<PartitionedFile>` items to process
852/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
853///   once the accumulated number of rows exceeds this limit
854/// * `collect_stats` - Whether to collect and accumulate statistics from the files
855///
856/// # Returns
857/// A `Result` containing a `FileGroup` with the collected files
858/// and a boolean indicating whether the statistics are inexact.
859///
860/// # Note
861/// The function will continue processing files if statistics are not available or if the
862/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
863/// but files will still be collected.
864async fn get_files_with_limit(
865    files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
866    limit: Option<usize>,
867    collect_stats: bool,
868) -> datafusion_common::Result<(FileGroup, bool)> {
869    let mut file_group = FileGroup::default();
870    // Fusing the stream allows us to call next safely even once it is finished.
871    let mut all_files = Box::pin(files.fuse());
872    enum ProcessingState {
873        ReadingFiles,
874        ReachedLimit,
875    }
876
877    let mut state = ProcessingState::ReadingFiles;
878    let mut num_rows = Precision::Absent;
879
880    while let Some(file_result) = all_files.next().await {
881        // Early exit if we've already reached our limit
882        if matches!(state, ProcessingState::ReachedLimit) {
883            break;
884        }
885
886        let file = file_result?;
887
888        // Update file statistics regardless of state
889        if collect_stats && let Some(file_stats) = &file.statistics {
890            num_rows = if file_group.is_empty() {
891                // For the first file, just take its row count
892                file_stats.num_rows
893            } else {
894                // For subsequent files, accumulate the counts
895                num_rows.add(&file_stats.num_rows)
896            };
897        }
898
899        // Always add the file to our group
900        file_group.push(file);
901
902        // Check if we've hit the limit (if one was specified)
903        if let Some(limit) = limit
904            && let Precision::Exact(row_count) = num_rows
905            && row_count > limit
906        {
907            state = ProcessingState::ReachedLimit;
908        }
909    }
910    // If we still have files in the stream, it means that the limit kicked
911    // in, and the statistic could have been different had we processed the
912    // files in a different order.
913    let inexact_stats = all_files.next().await.is_some();
914    Ok((file_group, inexact_stats))
915}
916
917#[cfg(test)]
918mod tests {
919    use super::*;
920    use arrow::compute::SortOptions;
921    use datafusion_physical_expr::expressions::Column;
922    use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
923    use std::sync::Arc;
924
925    /// Helper to create a PhysicalSortExpr
926    fn sort_expr(
927        name: &str,
928        idx: usize,
929        descending: bool,
930        nulls_first: bool,
931    ) -> PhysicalSortExpr {
932        PhysicalSortExpr::new(
933            Arc::new(Column::new(name, idx)),
934            SortOptions {
935                descending,
936                nulls_first,
937            },
938        )
939    }
940
941    /// Helper to create a LexOrdering (unwraps the Option)
942    fn lex_ordering(exprs: Vec<PhysicalSortExpr>) -> LexOrdering {
943        LexOrdering::new(exprs).expect("expected non-empty ordering")
944    }
945
946    /// Helper to create a PartitionedFile with optional ordering
947    fn create_file(name: &str, ordering: Option<LexOrdering>) -> PartitionedFile {
948        PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering)
949    }
950
951    #[test]
952    fn test_derive_common_ordering_all_files_same_ordering() {
953        // All files have the same ordering -> returns that ordering
954        let ordering = lex_ordering(vec![
955            sort_expr("a", 0, false, true),
956            sort_expr("b", 1, true, false),
957        ]);
958
959        let file_groups = vec![
960            FileGroup::new(vec![
961                create_file("f1.parquet", Some(ordering.clone())),
962                create_file("f2.parquet", Some(ordering.clone())),
963            ]),
964            FileGroup::new(vec![create_file("f3.parquet", Some(ordering.clone()))]),
965        ];
966
967        let result = derive_common_ordering_from_files(&file_groups);
968        assert_eq!(result, Some(ordering));
969    }
970
971    #[test]
972    fn test_derive_common_ordering_common_prefix() {
973        // Files have different orderings but share a common prefix
974        let ordering_abc = lex_ordering(vec![
975            sort_expr("a", 0, false, true),
976            sort_expr("b", 1, false, true),
977            sort_expr("c", 2, false, true),
978        ]);
979        let ordering_ab = lex_ordering(vec![
980            sort_expr("a", 0, false, true),
981            sort_expr("b", 1, false, true),
982        ]);
983
984        let file_groups = vec![FileGroup::new(vec![
985            create_file("f1.parquet", Some(ordering_abc)),
986            create_file("f2.parquet", Some(ordering_ab.clone())),
987        ])];
988
989        let result = derive_common_ordering_from_files(&file_groups);
990        assert_eq!(result, Some(ordering_ab));
991    }
992
993    #[test]
994    fn test_derive_common_ordering_no_common_prefix() {
995        // Files have completely different orderings -> returns None
996        let ordering_a = lex_ordering(vec![sort_expr("a", 0, false, true)]);
997        let ordering_b = lex_ordering(vec![sort_expr("b", 1, false, true)]);
998
999        let file_groups = vec![FileGroup::new(vec![
1000            create_file("f1.parquet", Some(ordering_a)),
1001            create_file("f2.parquet", Some(ordering_b)),
1002        ])];
1003
1004        let result = derive_common_ordering_from_files(&file_groups);
1005        assert_eq!(result, None);
1006    }
1007
1008    #[test]
1009    fn test_derive_common_ordering_mixed_with_none() {
1010        // Some files have ordering, some don't -> returns None
1011        let ordering = lex_ordering(vec![sort_expr("a", 0, false, true)]);
1012
1013        let file_groups = vec![FileGroup::new(vec![
1014            create_file("f1.parquet", Some(ordering)),
1015            create_file("f2.parquet", None),
1016        ])];
1017
1018        let result = derive_common_ordering_from_files(&file_groups);
1019        assert_eq!(result, None);
1020    }
1021
1022    #[test]
1023    fn test_derive_common_ordering_all_none() {
1024        // No files have ordering -> returns None
1025        let file_groups = vec![FileGroup::new(vec![
1026            create_file("f1.parquet", None),
1027            create_file("f2.parquet", None),
1028        ])];
1029
1030        let result = derive_common_ordering_from_files(&file_groups);
1031        assert_eq!(result, None);
1032    }
1033
1034    #[test]
1035    fn test_derive_common_ordering_empty_groups() {
1036        // Empty file groups -> returns None
1037        let file_groups: Vec<FileGroup> = vec![];
1038        let result = derive_common_ordering_from_files(&file_groups);
1039        assert_eq!(result, None);
1040    }
1041
1042    #[test]
1043    fn test_derive_common_ordering_single_file() {
1044        // Single file with ordering -> returns that ordering
1045        let ordering = lex_ordering(vec![
1046            sort_expr("a", 0, false, true),
1047            sort_expr("b", 1, true, false),
1048        ]);
1049
1050        let file_groups = vec![FileGroup::new(vec![create_file(
1051            "f1.parquet",
1052            Some(ordering.clone()),
1053        )])];
1054
1055        let result = derive_common_ordering_from_files(&file_groups);
1056        assert_eq!(result, Some(ordering));
1057    }
1058}