datafusion_catalog_listing/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::config::SchemaSource;
19use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
20use crate::{ListingOptions, ListingTableConfig};
21use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
22use async_trait::async_trait;
23use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
24use datafusion_common::stats::Precision;
25use datafusion_common::{
26    Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
27};
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_groups::FileGroup;
30use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31use datafusion_datasource::file_sink_config::FileSinkConfig;
32#[expect(deprecated)]
33use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
34use datafusion_datasource::{
35    ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
36};
37use datafusion_execution::cache::TableScopedPath;
38use datafusion_execution::cache::cache_manager::FileStatisticsCache;
39use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
40use datafusion_expr::dml::InsertOp;
41use datafusion_expr::execution_props::ExecutionProps;
42use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
43use datafusion_physical_expr::create_lex_ordering;
44use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
45use datafusion_physical_expr_common::sort_expr::LexOrdering;
46use datafusion_physical_plan::ExecutionPlan;
47use datafusion_physical_plan::empty::EmptyExec;
48use futures::{Stream, StreamExt, TryStreamExt, future, stream};
49use object_store::ObjectStore;
50use std::any::Any;
51use std::collections::HashMap;
52use std::sync::Arc;
53
54/// Result of a file listing operation from [`ListingTable::list_files_for_scan`].
55#[derive(Debug)]
56pub struct ListFilesResult {
57    /// File groups organized by the partitioning strategy.
58    pub file_groups: Vec<FileGroup>,
59    /// Aggregated statistics for all files.
60    pub statistics: Statistics,
61    /// Whether files are grouped by partition values (enables Hash partitioning).
62    pub grouped_by_partition: bool,
63}
64
65/// Built in [`TableProvider`] that reads data from one or more files as a single table.
66///
67/// The files are read using an  [`ObjectStore`] instance, for example from
68/// local files or objects from AWS S3.
69///
70/// # Features:
71/// * Reading multiple files as a single table
72/// * Hive style partitioning (e.g., directories named `date=2024-06-01`)
73/// * Merges schemas from files with compatible but not identical schemas (see [`ListingTableConfig::file_schema`])
74/// * `limit`, `filter` and `projection` pushdown for formats that support it (e.g.,
75///   Parquet)
76/// * Statistics collection and pruning based on file metadata
77/// * Pre-existing sort order (see [`ListingOptions::file_sort_order`])
78/// * Metadata caching to speed up repeated queries (see [`FileMetadataCache`])
79/// * Statistics caching (see [`FileStatisticsCache`])
80///
81/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache
82///
83/// # Reading Directories and Hive Style Partitioning
84///
85/// For example, given the `table1` directory (or object store prefix)
86///
87/// ```text
88/// table1
89///  ├── file1.parquet
90///  └── file2.parquet
91/// ```
92///
93/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as
94/// a single table, merging the schemas if the files have compatible but not
95/// identical schemas.
96///
97/// Given the `table2` directory (or object store prefix)
98///
99/// ```text
100/// table2
101///  ├── date=2024-06-01
102///  │    ├── file3.parquet
103///  │    └── file4.parquet
104///  └── date=2024-06-02
105///       └── file5.parquet
106/// ```
107///
108/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and
109/// `file5.parquet` as a single table, again merging schemas if necessary.
110///
111/// Given the hive style partitioning structure (e.g,. directories named
112/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date`
113/// column when reading the table:
114/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01`
115/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`.
116///
117/// If the query has a predicate like `WHERE date = '2024-06-01'`
118/// only the corresponding directory will be read.
119///
120/// # See Also
121///
122/// 1. [`ListingTableConfig`]: Configuration options
123/// 1. [`DataSourceExec`]: `ExecutionPlan` used by `ListingTable`
124///
125/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
126///
127/// # Caching Metadata
128///
129/// Some formats, such as Parquet, use the `FileMetadataCache` to cache file
130/// metadata that is needed to execute but expensive to read, such as row
131/// groups and statistics. The cache is scoped to the `SessionContext` and can
132/// be configured via the [runtime config options].
133///
134/// [runtime config options]: https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings
135///
136/// # Example: Read a directory of parquet files using a [`ListingTable`]
137///
138/// ```no_run
139/// # use datafusion_common::Result;
140/// # use std::sync::Arc;
141/// # use datafusion_catalog::TableProvider;
142/// # use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
143/// # use datafusion_datasource::ListingTableUrl;
144/// # use datafusion_datasource_parquet::file_format::ParquetFormat;/// #
145/// # use datafusion_catalog::Session;
146/// async fn get_listing_table(session: &dyn Session) -> Result<Arc<dyn TableProvider>> {
147/// let table_path = "/path/to/parquet";
148///
149/// // Parse the path
150/// let table_path = ListingTableUrl::parse(table_path)?;
151///
152/// // Create default parquet options
153/// let file_format = ParquetFormat::new();
154/// let listing_options = ListingOptions::new(Arc::new(file_format))
155///   .with_file_extension(".parquet");
156///
157/// // Resolve the schema
158/// let resolved_schema = listing_options
159///    .infer_schema(session, &table_path)
160///    .await?;
161///
162/// let config = ListingTableConfig::new(table_path)
163///   .with_listing_options(listing_options)
164///   .with_schema(resolved_schema);
165///
166/// // Create a new TableProvider
167/// let provider = Arc::new(ListingTable::try_new(config)?);
168///
169/// # Ok(provider)
170/// # }
171/// ```
172#[derive(Debug, Clone)]
173pub struct ListingTable {
174    table_paths: Vec<ListingTableUrl>,
175    /// `file_schema` contains only the columns physically stored in the data files themselves.
176    ///     - Represents the actual fields found in files like Parquet, CSV, etc.
177    ///     - Used when reading the raw data from files
178    file_schema: SchemaRef,
179    /// `table_schema` combines `file_schema` + partition columns
180    ///     - Partition columns are derived from directory paths (not stored in files)
181    ///     - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet`
182    table_schema: SchemaRef,
183    /// Indicates how the schema was derived (inferred or explicitly specified)
184    schema_source: SchemaSource,
185    /// Options used to configure the listing table such as the file format
186    /// and partitioning information
187    options: ListingOptions,
188    /// The SQL definition for this table, if any
189    definition: Option<String>,
190    /// Cache for collected file statistics
191    collected_statistics: Arc<dyn FileStatisticsCache>,
192    /// Constraints applied to this table
193    constraints: Constraints,
194    /// Column default expressions for columns that are not physically present in the data files
195    column_defaults: HashMap<String, Expr>,
196    /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
197    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
198}
199
200impl ListingTable {
201    /// Create new [`ListingTable`]
202    ///
203    /// See documentation and example on [`ListingTable`] and [`ListingTableConfig`]
204    pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result<Self> {
205        // Extract schema_source before moving other parts of the config
206        let schema_source = config.schema_source();
207
208        let file_schema = config
209            .file_schema
210            .ok_or_else(|| internal_datafusion_err!("No schema provided."))?;
211
212        let options = config
213            .options
214            .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;
215
216        // Add the partition columns to the file schema
217        let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
218        for (part_col_name, part_col_type) in &options.table_partition_cols {
219            builder.push(Field::new(part_col_name, part_col_type.clone(), false));
220        }
221
222        let table_schema = Arc::new(
223            builder
224                .finish()
225                .with_metadata(file_schema.metadata().clone()),
226        );
227
228        let table = Self {
229            table_paths: config.table_paths,
230            file_schema,
231            table_schema,
232            schema_source,
233            options,
234            definition: None,
235            collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
236            constraints: Constraints::default(),
237            column_defaults: HashMap::new(),
238            expr_adapter_factory: config.expr_adapter_factory,
239        };
240
241        Ok(table)
242    }
243
244    /// Assign constraints
245    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
246        self.constraints = constraints;
247        self
248    }
249
250    /// Assign column defaults
251    pub fn with_column_defaults(
252        mut self,
253        column_defaults: HashMap<String, Expr>,
254    ) -> Self {
255        self.column_defaults = column_defaults;
256        self
257    }
258
259    /// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
260    ///
261    /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
262    /// multiple times in the same session.
263    ///
264    /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
265    pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
266        self.collected_statistics =
267            cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
268        self
269    }
270
271    /// Specify the SQL definition for this table, if any
272    pub fn with_definition(mut self, definition: Option<String>) -> Self {
273        self.definition = definition;
274        self
275    }
276
277    /// Get paths ref
278    pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
279        &self.table_paths
280    }
281
282    /// Get options ref
283    pub fn options(&self) -> &ListingOptions {
284        &self.options
285    }
286
287    /// Get the schema source
288    pub fn schema_source(&self) -> SchemaSource {
289        self.schema_source
290    }
291
292    /// Deprecated: Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
293    ///
294    /// `SchemaAdapterFactory` has been removed. Use [`ListingTableConfig::with_expr_adapter_factory`]
295    /// and `PhysicalExprAdapterFactory` instead. See `upgrading.md` for more details.
296    ///
297    /// This method is a no-op and returns `self` unchanged.
298    #[deprecated(
299        since = "52.0.0",
300        note = "SchemaAdapterFactory has been removed. Use ListingTableConfig::with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
301    )]
302    #[expect(deprecated)]
303    pub fn with_schema_adapter_factory(
304        self,
305        _schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
306    ) -> Self {
307        // No-op - just return self unchanged
308        self
309    }
310
311    /// Deprecated: Returns the [`SchemaAdapterFactory`] used by this [`ListingTable`].
312    ///
313    /// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
314    /// See `upgrading.md` for more details.
315    ///
316    /// Always returns `None`.
317    #[deprecated(
318        since = "52.0.0",
319        note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
320    )]
321    #[expect(deprecated)]
322    pub fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
323        None
324    }
325
326    /// Creates a file source for this table
327    fn create_file_source(&self) -> Arc<dyn FileSource> {
328        let table_schema = TableSchema::new(
329            Arc::clone(&self.file_schema),
330            self.options
331                .table_partition_cols
332                .iter()
333                .map(|(col, field)| Arc::new(Field::new(col, field.clone(), false)))
334                .collect(),
335        );
336
337        self.options.format.file_source(table_schema)
338    }
339
340    /// If file_sort_order is specified, creates the appropriate physical expressions
341    pub fn try_create_output_ordering(
342        &self,
343        execution_props: &ExecutionProps,
344    ) -> datafusion_common::Result<Vec<LexOrdering>> {
345        create_lex_ordering(
346            &self.table_schema,
347            &self.options.file_sort_order,
348            execution_props,
349        )
350    }
351}
352
353// Expressions can be used for partition pruning if they can be evaluated using
354// only the partition columns and there are partition columns.
355fn can_be_evaluated_for_partition_pruning(
356    partition_column_names: &[&str],
357    expr: &Expr,
358) -> bool {
359    !partition_column_names.is_empty()
360        && expr_applicable_for_cols(partition_column_names, expr)
361}
362
363#[async_trait]
364impl TableProvider for ListingTable {
365    fn as_any(&self) -> &dyn Any {
366        self
367    }
368
369    fn schema(&self) -> SchemaRef {
370        Arc::clone(&self.table_schema)
371    }
372
373    fn constraints(&self) -> Option<&Constraints> {
374        Some(&self.constraints)
375    }
376
377    fn table_type(&self) -> TableType {
378        TableType::Base
379    }
380
381    async fn scan(
382        &self,
383        state: &dyn Session,
384        projection: Option<&Vec<usize>>,
385        filters: &[Expr],
386        limit: Option<usize>,
387    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
388        let options = ScanArgs::default()
389            .with_projection(projection.map(|p| p.as_slice()))
390            .with_filters(Some(filters))
391            .with_limit(limit);
392        Ok(self.scan_with_args(state, options).await?.into_inner())
393    }
394
395    async fn scan_with_args<'a>(
396        &self,
397        state: &dyn Session,
398        args: ScanArgs<'a>,
399    ) -> datafusion_common::Result<ScanResult> {
400        let projection = args.projection().map(|p| p.to_vec());
401        let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
402        let limit = args.limit();
403
404        // extract types of partition columns
405        let table_partition_cols = self
406            .options
407            .table_partition_cols
408            .iter()
409            .map(|col| Ok(Arc::new(self.table_schema.field_with_name(&col.0)?.clone())))
410            .collect::<datafusion_common::Result<Vec<_>>>()?;
411
412        let table_partition_col_names = table_partition_cols
413            .iter()
414            .map(|field| field.name().as_str())
415            .collect::<Vec<_>>();
416
417        // If the filters can be resolved using only partition cols, there is no need to
418        // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
419        let (partition_filters, filters): (Vec<_>, Vec<_>) =
420            filters.iter().cloned().partition(|filter| {
421                can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
422            });
423
424        // We should not limit the number of partitioned files to scan if there are filters and limit
425        // at the same time. This is because the limit should be applied after the filters are applied.
426        let statistic_file_limit = if filters.is_empty() { limit } else { None };
427
428        let ListFilesResult {
429            file_groups: mut partitioned_file_lists,
430            statistics,
431            grouped_by_partition: partitioned_by_file_group,
432        } = self
433            .list_files_for_scan(state, &partition_filters, statistic_file_limit)
434            .await?;
435
436        // if no files need to be read, return an `EmptyExec`
437        if partitioned_file_lists.is_empty() {
438            let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
439            return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
440        }
441
442        let output_ordering = self.try_create_output_ordering(state.execution_props())?;
443        match state
444            .config_options()
445            .execution
446            .split_file_groups_by_statistics
447            .then(|| {
448                output_ordering.first().map(|output_ordering| {
449                    FileScanConfig::split_groups_by_statistics_with_target_partitions(
450                        &self.table_schema,
451                        &partitioned_file_lists,
452                        output_ordering,
453                        self.options.target_partitions,
454                    )
455                })
456            })
457            .flatten()
458        {
459            Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
460            Some(Ok(new_groups)) => {
461                if new_groups.len() <= self.options.target_partitions {
462                    partitioned_file_lists = new_groups;
463                } else {
464                    log::debug!(
465                        "attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered"
466                    )
467                }
468            }
469            None => {} // no ordering required
470        };
471
472        let Some(object_store_url) =
473            self.table_paths.first().map(ListingTableUrl::object_store)
474        else {
475            return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
476                Schema::empty(),
477            )))));
478        };
479
480        let file_source = self.create_file_source();
481
482        // create the execution plan
483        let plan = self
484            .options
485            .format
486            .create_physical_plan(
487                state,
488                FileScanConfigBuilder::new(object_store_url, file_source)
489                    .with_file_groups(partitioned_file_lists)
490                    .with_constraints(self.constraints.clone())
491                    .with_statistics(statistics)
492                    .with_projection_indices(projection)?
493                    .with_limit(limit)
494                    .with_output_ordering(output_ordering)
495                    .with_expr_adapter(self.expr_adapter_factory.clone())
496                    .with_partitioned_by_file_group(partitioned_by_file_group)
497                    .build(),
498            )
499            .await?;
500
501        Ok(ScanResult::new(plan))
502    }
503
504    fn supports_filters_pushdown(
505        &self,
506        filters: &[&Expr],
507    ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
508        let partition_column_names = self
509            .options
510            .table_partition_cols
511            .iter()
512            .map(|col| col.0.as_str())
513            .collect::<Vec<_>>();
514        filters
515            .iter()
516            .map(|filter| {
517                if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
518                {
519                    // if filter can be handled by partition pruning, it is exact
520                    return Ok(TableProviderFilterPushDown::Exact);
521                }
522
523                Ok(TableProviderFilterPushDown::Inexact)
524            })
525            .collect()
526    }
527
528    fn get_table_definition(&self) -> Option<&str> {
529        self.definition.as_deref()
530    }
531
532    async fn insert_into(
533        &self,
534        state: &dyn Session,
535        input: Arc<dyn ExecutionPlan>,
536        insert_op: InsertOp,
537    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
538        // Check that the schema of the plan matches the schema of this table.
539        self.schema()
540            .logically_equivalent_names_and_types(&input.schema())?;
541
542        let table_path = &self.table_paths()[0];
543        if !table_path.is_collection() {
544            return plan_err!(
545                "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
546                To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
547            );
548        }
549
550        // Get the object store for the table path.
551        let store = state.runtime_env().object_store(table_path)?;
552
553        let file_list_stream = pruned_partition_list(
554            state,
555            store.as_ref(),
556            table_path,
557            &[],
558            &self.options.file_extension,
559            &self.options.table_partition_cols,
560        )
561        .await?;
562
563        let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
564        let keep_partition_by_columns =
565            state.config_options().execution.keep_partition_by_columns;
566
567        // Invalidate cache entries for this table if they exist
568        if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
569            let key = TableScopedPath {
570                table: table_path.get_table_ref().clone(),
571                path: table_path.prefix().clone(),
572            };
573            let _ = lfc.remove(&key);
574        }
575
576        // Sink related option, apart from format
577        let config = FileSinkConfig {
578            original_url: String::default(),
579            object_store_url: self.table_paths()[0].object_store(),
580            table_paths: self.table_paths().clone(),
581            file_group,
582            output_schema: self.schema(),
583            table_partition_cols: self.options.table_partition_cols.clone(),
584            insert_op,
585            keep_partition_by_columns,
586            file_extension: self.options().format.get_ext(),
587        };
588
589        let orderings = self.try_create_output_ordering(state.execution_props())?;
590        // It is sufficient to pass only one of the equivalent orderings:
591        let order_requirements = orderings.into_iter().next().map(Into::into);
592
593        self.options()
594            .format
595            .create_writer_physical_plan(input, state, config, order_requirements)
596            .await
597    }
598
599    fn get_column_default(&self, column: &str) -> Option<&Expr> {
600        self.column_defaults.get(column)
601    }
602}
603
604impl ListingTable {
605    /// Get the list of files for a scan as well as the file level statistics.
606    /// The list is grouped to let the execution plan know how the files should
607    /// be distributed to different threads / executors.
608    pub async fn list_files_for_scan<'a>(
609        &'a self,
610        ctx: &'a dyn Session,
611        filters: &'a [Expr],
612        limit: Option<usize>,
613    ) -> datafusion_common::Result<ListFilesResult> {
614        let store = if let Some(url) = self.table_paths.first() {
615            ctx.runtime_env().object_store(url)?
616        } else {
617            return Ok(ListFilesResult {
618                file_groups: vec![],
619                statistics: Statistics::new_unknown(&self.file_schema),
620                grouped_by_partition: false,
621            });
622        };
623        // list files (with partitions)
624        let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
625            pruned_partition_list(
626                ctx,
627                store.as_ref(),
628                table_path,
629                filters,
630                &self.options.file_extension,
631                &self.options.table_partition_cols,
632            )
633        }))
634        .await?;
635        let meta_fetch_concurrency =
636            ctx.config_options().execution.meta_fetch_concurrency;
637        let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
638        // collect the statistics if required by the config
639        let files = file_list
640            .map(|part_file| async {
641                let part_file = part_file?;
642                let statistics = if self.options.collect_stat {
643                    self.do_collect_statistics(ctx, &store, &part_file).await?
644                } else {
645                    Arc::new(Statistics::new_unknown(&self.file_schema))
646                };
647                Ok(part_file.with_statistics(statistics))
648            })
649            .boxed()
650            .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
651
652        let (file_group, inexact_stats) =
653            get_files_with_limit(files, limit, self.options.collect_stat).await?;
654
655        // Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N
656        //
657        // When enabled, files are grouped by their Hive partition column values, allowing
658        // FileScanConfig to declare Hash partitioning. This enables the optimizer to skip
659        // hash repartitioning for aggregates and joins on partition columns.
660        let threshold = ctx.config_options().optimizer.preserve_file_partitions;
661
662        let (file_groups, grouped_by_partition) = if threshold > 0
663            && !self.options.table_partition_cols.is_empty()
664        {
665            let grouped =
666                file_group.group_by_partition_values(self.options.target_partitions);
667            if grouped.len() >= threshold {
668                (grouped, true)
669            } else {
670                let all_files: Vec<_> =
671                    grouped.into_iter().flat_map(|g| g.into_inner()).collect();
672                (
673                    FileGroup::new(all_files).split_files(self.options.target_partitions),
674                    false,
675                )
676            }
677        } else {
678            (
679                file_group.split_files(self.options.target_partitions),
680                false,
681            )
682        };
683
684        let (file_groups, stats) = compute_all_files_statistics(
685            file_groups,
686            self.schema(),
687            self.options.collect_stat,
688            inexact_stats,
689        )?;
690
691        // Note: Statistics already include both file columns and partition columns.
692        // PartitionedFile::with_statistics automatically appends exact partition column
693        // statistics (min=max=partition_value, null_count=0, distinct_count=1) computed
694        // from partition_values.
695        Ok(ListFilesResult {
696            file_groups,
697            statistics: stats,
698            grouped_by_partition,
699        })
700    }
701
702    /// Collects statistics for a given partitioned file.
703    ///
704    /// This method first checks if the statistics for the given file are already cached.
705    /// If they are, it returns the cached statistics.
706    /// If they are not, it infers the statistics from the file and stores them in the cache.
707    async fn do_collect_statistics(
708        &self,
709        ctx: &dyn Session,
710        store: &Arc<dyn ObjectStore>,
711        part_file: &PartitionedFile,
712    ) -> datafusion_common::Result<Arc<Statistics>> {
713        match self
714            .collected_statistics
715            .get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
716        {
717            Some(statistics) => Ok(statistics),
718            None => {
719                let statistics = self
720                    .options
721                    .format
722                    .infer_stats(
723                        ctx,
724                        store,
725                        Arc::clone(&self.file_schema),
726                        &part_file.object_meta,
727                    )
728                    .await?;
729                let statistics = Arc::new(statistics);
730                self.collected_statistics.put_with_extra(
731                    &part_file.object_meta.location,
732                    Arc::clone(&statistics),
733                    &part_file.object_meta,
734                );
735                Ok(statistics)
736            }
737        }
738    }
739}
740
741/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
742///
743/// This function collects files from the provided stream until either:
744/// 1. The stream is exhausted
745/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
746///
747/// # Arguments
748/// * `files` - A stream of `Result<PartitionedFile>` items to process
749/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
750///   once the accumulated number of rows exceeds this limit
751/// * `collect_stats` - Whether to collect and accumulate statistics from the files
752///
753/// # Returns
754/// A `Result` containing a `FileGroup` with the collected files
755/// and a boolean indicating whether the statistics are inexact.
756///
757/// # Note
758/// The function will continue processing files if statistics are not available or if the
759/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
760/// but files will still be collected.
761async fn get_files_with_limit(
762    files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
763    limit: Option<usize>,
764    collect_stats: bool,
765) -> datafusion_common::Result<(FileGroup, bool)> {
766    let mut file_group = FileGroup::default();
767    // Fusing the stream allows us to call next safely even once it is finished.
768    let mut all_files = Box::pin(files.fuse());
769    enum ProcessingState {
770        ReadingFiles,
771        ReachedLimit,
772    }
773
774    let mut state = ProcessingState::ReadingFiles;
775    let mut num_rows = Precision::Absent;
776
777    while let Some(file_result) = all_files.next().await {
778        // Early exit if we've already reached our limit
779        if matches!(state, ProcessingState::ReachedLimit) {
780            break;
781        }
782
783        let file = file_result?;
784
785        // Update file statistics regardless of state
786        if collect_stats && let Some(file_stats) = &file.statistics {
787            num_rows = if file_group.is_empty() {
788                // For the first file, just take its row count
789                file_stats.num_rows
790            } else {
791                // For subsequent files, accumulate the counts
792                num_rows.add(&file_stats.num_rows)
793            };
794        }
795
796        // Always add the file to our group
797        file_group.push(file);
798
799        // Check if we've hit the limit (if one was specified)
800        if let Some(limit) = limit
801            && let Precision::Exact(row_count) = num_rows
802            && row_count > limit
803        {
804            state = ProcessingState::ReachedLimit;
805        }
806    }
807    // If we still have files in the stream, it means that the limit kicked
808    // in, and the statistic could have been different had we processed the
809    // files in a different order.
810    let inexact_stats = all_files.next().await.is_some();
811    Ok((file_group, inexact_stats))
812}