Skip to main content

datafusion_datasource/
file_scan_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21use crate::file_groups::FileGroup;
22use crate::{
23    PartitionedFile, display::FileGroupsDisplay, file::FileSource,
24    file_compression_type::FileCompressionType, file_stream::FileStream,
25    source::DataSource, statistics::MinMaxStatistics,
26};
27use arrow::datatypes::FieldRef;
28use arrow::datatypes::{DataType, Schema, SchemaRef};
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::{
31    Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
32};
33use datafusion_execution::{
34    SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
35};
36use datafusion_expr::Operator;
37
38use datafusion_physical_expr::equivalence::project_orderings;
39use datafusion_physical_expr::expressions::{BinaryExpr, Column};
40use datafusion_physical_expr::projection::ProjectionExprs;
41use datafusion_physical_expr::utils::reassign_expr_columns;
42use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
43use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
44use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
45use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
46use datafusion_physical_plan::SortOrderPushdownResult;
47use datafusion_physical_plan::coop::cooperative;
48use datafusion_physical_plan::execution_plan::SchedulingType;
49use datafusion_physical_plan::{
50    DisplayAs, DisplayFormatType,
51    display::{ProjectSchemaDisplay, display_orderings},
52    filter_pushdown::FilterPushdownPropagation,
53    metrics::ExecutionPlanMetricsSet,
54};
55use log::{debug, warn};
56use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
57
58/// [`FileScanConfig`] represents scanning data from a group of files
59///
60/// `FileScanConfig` is used to create a [`DataSourceExec`], the physical plan
61/// for scanning files with a particular file format.
62///
63/// The [`FileSource`] (e.g. `ParquetSource`, `CsvSource`, etc.) is responsible
64/// for creating the actual execution plan to read the files based on a
65/// `FileScanConfig`. Fields in a `FileScanConfig` such as Statistics represent
66/// information about the files **before** any projection or filtering is
67/// applied in the file source.
68///
69/// Use [`FileScanConfigBuilder`] to construct a `FileScanConfig`.
70///
71/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from
72/// a `FileScanConfig`.
73///
74/// # Example
75/// ```
76/// # use std::any::Any;
77/// # use std::sync::Arc;
78/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
79/// # use object_store::ObjectStore;
80/// # use datafusion_common::Result;
81/// # use datafusion_datasource::file::FileSource;
82/// # use datafusion_datasource::file_groups::FileGroup;
83/// # use datafusion_datasource::PartitionedFile;
84/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
85/// # use datafusion_datasource::file_stream::FileOpener;
86/// # use datafusion_datasource::source::DataSourceExec;
87/// # use datafusion_datasource::table_schema::TableSchema;
88/// # use datafusion_execution::object_store::ObjectStoreUrl;
89/// # use datafusion_physical_expr::projection::ProjectionExprs;
90/// # use datafusion_physical_plan::ExecutionPlan;
91/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
92/// # let file_schema = Arc::new(Schema::new(vec![
93/// #  Field::new("c1", DataType::Int32, false),
94/// #  Field::new("c2", DataType::Int32, false),
95/// #  Field::new("c3", DataType::Int32, false),
96/// #  Field::new("c4", DataType::Int32, false),
97/// # ]));
98/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
99/// #[derive(Clone)]
100/// # struct ParquetSource {
101/// #    table_schema: TableSchema,
102/// # };
103/// # impl FileSource for ParquetSource {
104/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Result<Arc<dyn FileOpener>> { unimplemented!() }
105/// #  fn as_any(&self) -> &dyn Any { self  }
106/// #  fn table_schema(&self) -> &TableSchema { &self.table_schema }
107/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
108/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
109/// #  fn file_type(&self) -> &str { "parquet" }
110/// #  // Note that this implementation drops the projection on the floor, it is not complete!
111/// #  fn try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result<Option<Arc<dyn FileSource>>> { Ok(Some(Arc::new(self.clone()) as Arc<dyn FileSource>)) }
112/// #  }
113/// # impl ParquetSource {
114/// #  fn new(table_schema: impl Into<TableSchema>) -> Self { Self {table_schema: table_schema.into()} }
115/// # }
116/// // create FileScan config for reading parquet files from file://
117/// let object_store_url = ObjectStoreUrl::local_filesystem();
118/// let file_source = Arc::new(ParquetSource::new(file_schema.clone()));
119/// let config = FileScanConfigBuilder::new(object_store_url, file_source)
120///   .with_limit(Some(1000))            // read only the first 1000 records
121///   .with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3
122///   .expect("Failed to push down projection")
123///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
124///   .with_file(PartitionedFile::new("file1.parquet", 1234))
125///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
126///   // in a  single row group
127///   .with_file_group(FileGroup::new(vec![
128///    PartitionedFile::new("file2.parquet", 56),
129///    PartitionedFile::new("file3.parquet", 78),
130///   ])).build();
131/// // create an execution plan from the config
132/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
133/// ```
134///
135/// [`DataSourceExec`]: crate::source::DataSourceExec
136/// [`DataSourceExec::from_data_source`]: crate::source::DataSourceExec::from_data_source
137#[derive(Clone)]
138pub struct FileScanConfig {
139    /// Object store URL, used to get an [`ObjectStore`] instance from
140    /// [`RuntimeEnv::object_store`]
141    ///
142    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
143    /// as `file://` or `s3://my_bucket`. It should not include the path to the
144    /// file itself. The relevant URL prefix must be registered via
145    /// [`RuntimeEnv::register_object_store`]
146    ///
147    /// [`ObjectStore`]: object_store::ObjectStore
148    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
149    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
150    pub object_store_url: ObjectStoreUrl,
151    /// List of files to be processed, grouped into partitions
152    ///
153    /// Each file must have a schema of `file_schema` or a subset. If
154    /// a particular file has a subset, the missing columns are
155    /// padded with NULLs.
156    ///
157    /// DataFusion may attempt to read each partition of files
158    /// concurrently, however files *within* a partition will be read
159    /// sequentially, one after the next.
160    pub file_groups: Vec<FileGroup>,
161    /// Table constraints
162    pub constraints: Constraints,
163    /// The maximum number of records to read from this plan. If `None`,
164    /// all records after filtering are returned.
165    pub limit: Option<usize>,
166    /// Whether the scan's limit is order sensitive
167    /// When `true`, files must be read in the exact order specified to produce
168    /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
169    /// DataFusion may reorder file processing for optimization without affecting correctness.
170    pub preserve_order: bool,
171    /// All equivalent lexicographical output orderings of this file scan, in terms of
172    /// [`FileSource::table_schema`]. See [`FileScanConfigBuilder::with_output_ordering`] for more
173    /// details.
174    ///
175    /// [`Self::eq_properties`] uses this information along with projection
176    /// and filtering information to compute the effective
177    /// [`EquivalenceProperties`]
178    pub output_ordering: Vec<LexOrdering>,
179    /// File compression type
180    pub file_compression_type: FileCompressionType,
181    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
182    pub file_source: Arc<dyn FileSource>,
183    /// Batch size while creating new batches
184    /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
185    pub batch_size: Option<usize>,
186    /// Expression adapter used to adapt filters and projections that are pushed down into the scan
187    /// from the logical schema to the physical schema of the file.
188    pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
189    /// Statistics for the entire table (file schema + partition columns).
190    /// See [`FileScanConfigBuilder::with_statistics`] for more details.
191    ///
192    /// The effective statistics are computed on-demand via
193    /// [`ProjectionExprs::project_statistics`].
194    ///
195    /// Note that this field is pub(crate) because accessing it directly from outside
196    /// would be incorrect if there are filters being applied, thus this should be accessed
197    /// via [`FileScanConfig::statistics`].
198    pub(crate) statistics: Statistics,
199    /// When true, file_groups are organized by partition column values
200    /// and output_partitioning will return Hash partitioning on partition columns.
201    /// This allows the optimizer to skip hash repartitioning for aggregates and joins
202    /// on partition columns.
203    ///
204    /// If the number of file partitions > target_partitions, the file partitions will be grouped
205    /// in a round-robin fashion such that number of file partitions = target_partitions.
206    pub partitioned_by_file_group: bool,
207}
208
209/// A builder for [`FileScanConfig`]'s.
210///
211/// Example:
212///
213/// ```rust
214/// # use std::sync::Arc;
215/// # use arrow::datatypes::{DataType, Field, Schema};
216/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig};
217/// # use datafusion_datasource::file_compression_type::FileCompressionType;
218/// # use datafusion_datasource::file_groups::FileGroup;
219/// # use datafusion_datasource::PartitionedFile;
220/// # use datafusion_datasource::table_schema::TableSchema;
221/// # use datafusion_execution::object_store::ObjectStoreUrl;
222/// # use datafusion_common::Statistics;
223/// # use datafusion_datasource::file::FileSource;
224///
225/// # fn main() {
226/// # fn with_source(file_source: Arc<dyn FileSource>) {
227///     // Create a schema for our Parquet files
228///     let file_schema = Arc::new(Schema::new(vec![
229///         Field::new("id", DataType::Int32, false),
230///         Field::new("value", DataType::Utf8, false),
231///     ]));
232///
233///     // Create partition columns
234///     let partition_cols = vec![
235///         Arc::new(Field::new("date", DataType::Utf8, false)),
236///     ];
237///
238///     // Create table schema with file schema and partition columns
239///     let table_schema = TableSchema::new(file_schema, partition_cols);
240///
241///     // Create a builder for scanning Parquet files from a local filesystem
242///     let config = FileScanConfigBuilder::new(
243///         ObjectStoreUrl::local_filesystem(),
244///         file_source,
245///     )
246///     // Set a limit of 1000 rows
247///     .with_limit(Some(1000))
248///     // Project only the first column
249///     .with_projection_indices(Some(vec![0]))
250///     .expect("Failed to push down projection")
251///     // Add a file group with two files
252///     .with_file_group(FileGroup::new(vec![
253///         PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024),
254///         PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048),
255///     ]))
256///     // Set compression type
257///     .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
258///     // Build the final config
259///     .build();
260/// # }
261/// # }
262/// ```
263#[derive(Clone)]
264pub struct FileScanConfigBuilder {
265    object_store_url: ObjectStoreUrl,
266    file_source: Arc<dyn FileSource>,
267    limit: Option<usize>,
268    preserve_order: bool,
269    constraints: Option<Constraints>,
270    file_groups: Vec<FileGroup>,
271    statistics: Option<Statistics>,
272    output_ordering: Vec<LexOrdering>,
273    file_compression_type: Option<FileCompressionType>,
274    batch_size: Option<usize>,
275    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
276    partitioned_by_file_group: bool,
277}
278
279impl FileScanConfigBuilder {
280    /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files.
281    ///
282    /// # Parameters:
283    /// * `object_store_url`: See [`FileScanConfig::object_store_url`]
284    /// * `file_source`: See [`FileScanConfig::file_source`]. The file source must have
285    ///   a schema set via its constructor.
286    pub fn new(
287        object_store_url: ObjectStoreUrl,
288        file_source: Arc<dyn FileSource>,
289    ) -> Self {
290        Self {
291            object_store_url,
292            file_source,
293            file_groups: vec![],
294            statistics: None,
295            output_ordering: vec![],
296            file_compression_type: None,
297            limit: None,
298            preserve_order: false,
299            constraints: None,
300            batch_size: None,
301            expr_adapter_factory: None,
302            partitioned_by_file_group: false,
303        }
304    }
305
306    /// Set the maximum number of records to read from this plan.
307    ///
308    /// If `None`, all records after filtering are returned.
309    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
310        self.limit = limit;
311        self
312    }
313
314    /// Set whether the limit should be order-sensitive.
315    ///
316    /// When `true`, files must be read in the exact order specified to produce
317    /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
318    /// DataFusion may reorder file processing for optimization without
319    /// affecting correctness.
320    pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
321        self.preserve_order = order_sensitive;
322        self
323    }
324
325    /// Set the file source for scanning files.
326    ///
327    /// This method allows you to change the file source implementation (e.g.
328    /// ParquetSource, CsvSource, etc.) after the builder has been created.
329    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
330        self.file_source = file_source;
331        self
332    }
333
334    /// Return the table schema
335    pub fn table_schema(&self) -> &SchemaRef {
336        self.file_source.table_schema().table_schema()
337    }
338
339    /// Set the columns on which to project the data. Indexes that are higher than the
340    /// number of columns of `file_schema` refer to `table_partition_cols`.
341    ///
342    /// # Deprecated
343    /// Use [`Self::with_projection_indices`] instead. This method will be removed in a future release.
344    #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
345    pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
346        match self.clone().with_projection_indices(indices) {
347            Ok(builder) => builder,
348            Err(e) => {
349                warn!(
350                    "Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
351                );
352                self
353            }
354        }
355    }
356
357    /// Set the columns on which to project the data using column indices.
358    ///
359    /// This method attempts to push down the projection to the underlying file
360    /// source if supported. If the file source does not support projection
361    /// pushdown, an error is returned.
362    ///
363    /// Indexes that are higher than the number of columns of `file_schema`
364    /// refer to `table_partition_cols`.
365    pub fn with_projection_indices(
366        mut self,
367        indices: Option<Vec<usize>>,
368    ) -> Result<Self> {
369        let projection_exprs = indices.map(|indices| {
370            ProjectionExprs::from_indices(
371                &indices,
372                self.file_source.table_schema().table_schema(),
373            )
374        });
375        let Some(projection_exprs) = projection_exprs else {
376            return Ok(self);
377        };
378        let new_source = self
379            .file_source
380            .try_pushdown_projection(&projection_exprs)
381            .map_err(|e| {
382                internal_datafusion_err!(
383                    "Failed to push down projection in FileScanConfigBuilder::build: {e}"
384                )
385            })?;
386        if let Some(new_source) = new_source {
387            self.file_source = new_source;
388        } else {
389            internal_err!(
390                "FileSource {} does not support projection pushdown",
391                self.file_source.file_type()
392            )?;
393        }
394        Ok(self)
395    }
396
397    /// Set the table constraints
398    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
399        self.constraints = Some(constraints);
400        self
401    }
402
403    /// Set the statistics of the files, including partition
404    /// columns. Defaults to [`Statistics::new_unknown`].
405    ///
406    /// These statistics are for the entire table (file schema + partition
407    /// columns) before any projection or filtering is applied. Projections are
408    /// applied when statistics are retrieved, and if a filter is present,
409    /// [`FileScanConfig::statistics`] will mark the statistics as inexact
410    /// (counts are not adjusted).
411    ///
412    /// Projections and filters may be applied by the file source, either by
413    /// [`Self::with_projection_indices`] or a preexisting
414    /// [`FileSource::projection`] or [`FileSource::filter`].
415    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
416        self.statistics = Some(statistics);
417        self
418    }
419
420    /// Set the list of files to be processed, grouped into partitions.
421    ///
422    /// Each file must have a schema of `file_schema` or a subset. If
423    /// a particular file has a subset, the missing columns are
424    /// padded with NULLs.
425    ///
426    /// DataFusion may attempt to read each partition of files
427    /// concurrently, however files *within* a partition will be read
428    /// sequentially, one after the next.
429    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
430        self.file_groups = file_groups;
431        self
432    }
433
434    /// Add a new file group
435    ///
436    /// See [`Self::with_file_groups`] for more information
437    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
438        self.file_groups.push(file_group);
439        self
440    }
441
442    /// Add a file as a single group
443    ///
444    /// See [`Self::with_file_groups`] for more information.
445    pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
446        self.with_file_group(FileGroup::new(vec![partitioned_file]))
447    }
448
449    /// Set the output ordering of the files
450    ///
451    /// The expressions are in terms of the entire table schema (file schema +
452    /// partition columns), before any projection or filtering from the file
453    /// scan is applied.
454    ///
455    /// This is used for optimization purposes, e.g. to determine if a file scan
456    /// can satisfy an `ORDER BY` without an additional sort.
457    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
458        self.output_ordering = output_ordering;
459        self
460    }
461
462    /// Set the file compression type
463    pub fn with_file_compression_type(
464        mut self,
465        file_compression_type: FileCompressionType,
466    ) -> Self {
467        self.file_compression_type = Some(file_compression_type);
468        self
469    }
470
471    /// Set the batch_size property
472    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
473        self.batch_size = batch_size;
474        self
475    }
476
477    /// Register an expression adapter used to adapt filters and projections that are pushed down into the scan
478    /// from the logical schema to the physical schema of the file.
479    /// This can include things like:
480    /// - Column ordering changes
481    /// - Handling of missing columns
482    /// - Rewriting expression to use pre-computed values or file format specific optimizations
483    pub fn with_expr_adapter(
484        mut self,
485        expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
486    ) -> Self {
487        self.expr_adapter_factory = expr_adapter;
488        self
489    }
490
491    /// Set whether file groups are organized by partition column values.
492    ///
493    /// When set to true, the output partitioning will be declared as Hash partitioning
494    /// on the partition columns.
495    pub fn with_partitioned_by_file_group(
496        mut self,
497        partitioned_by_file_group: bool,
498    ) -> Self {
499        self.partitioned_by_file_group = partitioned_by_file_group;
500        self
501    }
502
503    /// Build the final [`FileScanConfig`] with all the configured settings.
504    ///
505    /// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
506    /// Any unset optional fields will use their default values.
507    ///
508    /// # Errors
509    /// Returns an error if projection pushdown fails or if schema operations fail.
510    pub fn build(self) -> FileScanConfig {
511        let Self {
512            object_store_url,
513            file_source,
514            limit,
515            preserve_order,
516            constraints,
517            file_groups,
518            statistics,
519            output_ordering,
520            file_compression_type,
521            batch_size,
522            expr_adapter_factory: expr_adapter,
523            partitioned_by_file_group,
524        } = self;
525
526        let constraints = constraints.unwrap_or_default();
527        let statistics = statistics.unwrap_or_else(|| {
528            Statistics::new_unknown(file_source.table_schema().table_schema())
529        });
530        let file_compression_type =
531            file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
532
533        // If there is an output ordering, we should preserve it.
534        let preserve_order = preserve_order || !output_ordering.is_empty();
535
536        FileScanConfig {
537            object_store_url,
538            file_source,
539            limit,
540            preserve_order,
541            constraints,
542            file_groups,
543            output_ordering,
544            file_compression_type,
545            batch_size,
546            expr_adapter_factory: expr_adapter,
547            statistics,
548            partitioned_by_file_group,
549        }
550    }
551}
552
553impl From<FileScanConfig> for FileScanConfigBuilder {
554    fn from(config: FileScanConfig) -> Self {
555        Self {
556            object_store_url: config.object_store_url,
557            file_source: Arc::<dyn FileSource>::clone(&config.file_source),
558            file_groups: config.file_groups,
559            statistics: Some(config.statistics),
560            output_ordering: config.output_ordering,
561            file_compression_type: Some(config.file_compression_type),
562            limit: config.limit,
563            preserve_order: config.preserve_order,
564            constraints: Some(config.constraints),
565            batch_size: config.batch_size,
566            expr_adapter_factory: config.expr_adapter_factory,
567            partitioned_by_file_group: config.partitioned_by_file_group,
568        }
569    }
570}
571
572impl DataSource for FileScanConfig {
573    fn open(
574        &self,
575        partition: usize,
576        context: Arc<TaskContext>,
577    ) -> Result<SendableRecordBatchStream> {
578        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
579        let batch_size = self
580            .batch_size
581            .unwrap_or_else(|| context.session_config().batch_size());
582
583        let source = self.file_source.with_batch_size(batch_size);
584
585        let opener = source.create_file_opener(object_store, self, partition)?;
586
587        let stream = FileStream::new(self, partition, opener, source.metrics())?;
588        Ok(Box::pin(cooperative(stream)))
589    }
590
591    fn as_any(&self) -> &dyn Any {
592        self
593    }
594
595    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
596        match t {
597            DisplayFormatType::Default | DisplayFormatType::Verbose => {
598                let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
599                let orderings = get_projected_output_ordering(self, &schema);
600
601                write!(f, "file_groups=")?;
602                FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
603
604                if !schema.fields().is_empty() {
605                    if let Some(projection) = self.file_source.projection() {
606                        // This matches what ProjectionExec does.
607                        // TODO: can we put this into ProjectionExprs so that it's shared code?
608                        let expr: Vec<String> = projection
609                            .as_ref()
610                            .iter()
611                            .map(|proj_expr| {
612                                if let Some(column) =
613                                    proj_expr.expr.as_any().downcast_ref::<Column>()
614                                {
615                                    if column.name() == proj_expr.alias {
616                                        column.name().to_string()
617                                    } else {
618                                        format!(
619                                            "{} as {}",
620                                            proj_expr.expr, proj_expr.alias
621                                        )
622                                    }
623                                } else {
624                                    format!("{} as {}", proj_expr.expr, proj_expr.alias)
625                                }
626                            })
627                            .collect();
628                        write!(f, ", projection=[{}]", expr.join(", "))?;
629                    } else {
630                        write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
631                    }
632                }
633
634                if let Some(limit) = self.limit {
635                    write!(f, ", limit={limit}")?;
636                }
637
638                display_orderings(f, &orderings)?;
639
640                if !self.constraints.is_empty() {
641                    write!(f, ", {}", self.constraints)?;
642                }
643
644                self.fmt_file_source(t, f)
645            }
646            DisplayFormatType::TreeRender => {
647                writeln!(f, "format={}", self.file_source.file_type())?;
648                self.file_source.fmt_extra(t, f)?;
649                let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
650                writeln!(f, "files={num_files}")?;
651                Ok(())
652            }
653        }
654    }
655
656    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
657    fn repartitioned(
658        &self,
659        target_partitions: usize,
660        repartition_file_min_size: usize,
661        output_ordering: Option<LexOrdering>,
662    ) -> Result<Option<Arc<dyn DataSource>>> {
663        // When files are grouped by partition values, we cannot allow byte-range
664        // splitting. It would mix rows from different partition values across
665        // file groups, breaking the Hash partitioning.
666        if self.partitioned_by_file_group {
667            return Ok(None);
668        }
669
670        let source = self.file_source.repartitioned(
671            target_partitions,
672            repartition_file_min_size,
673            output_ordering,
674            self,
675        )?;
676
677        Ok(source.map(|s| Arc::new(s) as _))
678    }
679
680    /// Returns the output partitioning for this file scan.
681    ///
682    /// When `partitioned_by_file_group` is true, this returns `Partitioning::Hash` on
683    /// the Hive partition columns, allowing the optimizer to skip hash repartitioning
684    /// for aggregates and joins on those columns.
685    ///
686    /// Tradeoffs
687    /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with
688    ///   `GROUP BY` or `ORDER BY` on partition columns.
689    /// - Cost: Files are grouped by partition values rather than split by byte
690    ///   ranges, which may reduce I/O parallelism when partition sizes are uneven.
691    ///   For simple aggregations without `ORDER BY`, this cost may outweigh the benefit.
692    ///
693    /// Follow-up Work
694    /// - Idea: Could allow byte-range splitting within partition-aware groups,
695    ///   preserving I/O parallelism while maintaining partition semantics.
696    fn output_partitioning(&self) -> Partitioning {
697        if self.partitioned_by_file_group {
698            let partition_cols = self.table_partition_cols();
699            if !partition_cols.is_empty() {
700                let projected_schema = match self.projected_schema() {
701                    Ok(schema) => schema,
702                    Err(_) => {
703                        debug!(
704                            "Could not get projected schema, falling back to UnknownPartitioning."
705                        );
706                        return Partitioning::UnknownPartitioning(self.file_groups.len());
707                    }
708                };
709
710                // Build Column expressions for partition columns based on their
711                // position in the projected schema
712                let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
713                for partition_col in partition_cols {
714                    if let Some((idx, _)) = projected_schema
715                        .fields()
716                        .iter()
717                        .enumerate()
718                        .find(|(_, f)| f.name() == partition_col.name())
719                    {
720                        exprs.push(Arc::new(Column::new(partition_col.name(), idx)));
721                    }
722                }
723
724                if exprs.len() == partition_cols.len() {
725                    return Partitioning::Hash(exprs, self.file_groups.len());
726                }
727            }
728        }
729        Partitioning::UnknownPartitioning(self.file_groups.len())
730    }
731
732    /// Computes the effective equivalence properties of this file scan, taking
733    /// into account the file schema, any projections or filters applied by the
734    /// file source, and the output ordering.
735    fn eq_properties(&self) -> EquivalenceProperties {
736        let schema = self.file_source.table_schema().table_schema();
737        let mut eq_properties = EquivalenceProperties::new_with_orderings(
738            Arc::clone(schema),
739            self.validated_output_ordering(),
740        )
741        .with_constraints(self.constraints.clone());
742
743        if let Some(filter) = self.file_source.filter() {
744            // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
745            // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
746            match Self::add_filter_equivalence_info(&filter, &mut eq_properties, schema) {
747                Ok(()) => {}
748                Err(e) => {
749                    warn!("Failed to add filter equivalence info: {e}");
750                    #[cfg(debug_assertions)]
751                    panic!("Failed to add filter equivalence info: {e}");
752                }
753            }
754        }
755
756        if let Some(projection) = self.file_source.projection() {
757            match (
758                projection.project_schema(schema),
759                projection.projection_mapping(schema),
760            ) {
761                (Ok(output_schema), Ok(mapping)) => {
762                    eq_properties =
763                        eq_properties.project(&mapping, Arc::new(output_schema));
764                }
765                (Err(e), _) | (_, Err(e)) => {
766                    warn!("Failed to project equivalence properties: {e}");
767                    #[cfg(debug_assertions)]
768                    panic!("Failed to project equivalence properties: {e}");
769                }
770            }
771        }
772
773        eq_properties
774    }
775
776    fn scheduling_type(&self) -> SchedulingType {
777        SchedulingType::Cooperative
778    }
779
780    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
781        if let Some(partition) = partition {
782            // Get statistics for a specific partition
783            // Note: FileGroup statistics include partition columns (computed from partition_values)
784            if let Some(file_group) = self.file_groups.get(partition)
785                && let Some(stat) = file_group.file_statistics(None)
786            {
787                // Project the statistics based on the projection
788                let output_schema = self.projected_schema()?;
789                return if let Some(projection) = self.file_source.projection() {
790                    projection.project_statistics(stat.clone(), &output_schema)
791                } else {
792                    Ok(stat.clone())
793                };
794            }
795            // If no statistics available for this partition, return unknown
796            Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
797        } else {
798            // Return aggregate statistics across all partitions
799            let statistics = self.statistics();
800            let projection = self.file_source.projection();
801            let output_schema = self.projected_schema()?;
802            if let Some(projection) = &projection {
803                projection.project_statistics(statistics.clone(), &output_schema)
804            } else {
805                Ok(statistics)
806            }
807        }
808    }
809
810    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
811        let source = FileScanConfigBuilder::from(self.clone())
812            .with_limit(limit)
813            .build();
814        Some(Arc::new(source))
815    }
816
817    fn fetch(&self) -> Option<usize> {
818        self.limit
819    }
820
821    fn metrics(&self) -> ExecutionPlanMetricsSet {
822        self.file_source.metrics().clone()
823    }
824
825    fn try_swapping_with_projection(
826        &self,
827        projection: &ProjectionExprs,
828    ) -> Result<Option<Arc<dyn DataSource>>> {
829        match self.file_source.try_pushdown_projection(projection)? {
830            Some(new_source) => {
831                let mut new_file_scan_config = self.clone();
832                new_file_scan_config.file_source = new_source;
833                Ok(Some(Arc::new(new_file_scan_config) as Arc<dyn DataSource>))
834            }
835            None => Ok(None),
836        }
837    }
838
839    fn try_pushdown_filters(
840        &self,
841        filters: Vec<Arc<dyn PhysicalExpr>>,
842        config: &ConfigOptions,
843    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
844        // Remap filter Column indices to match the table schema (file + partition columns).
845        // This is necessary because filters refer to the output schema of this `DataSource`
846        // (e.g., after projection pushdown has been applied) and need to be remapped to the table schema
847        // before being passed to the file source
848        //
849        // For example, consider a filter `c1_c2 > 5` being pushed down. If the
850        // `DataSource` has a projection `c1 + c2 as c1_c2`, the filter must be rewritten
851        // to refer to the table schema `c1 + c2 > 5`
852        let table_schema = self.file_source.table_schema().table_schema();
853        let filters_to_remap = if let Some(projection) = self.file_source.projection() {
854            filters
855                .into_iter()
856                .map(|filter| projection.unproject_expr(&filter))
857                .collect::<Result<Vec<_>>>()?
858        } else {
859            filters
860        };
861        // Now remap column indices to match the table schema.
862        let remapped_filters = filters_to_remap
863            .into_iter()
864            .map(|filter| reassign_expr_columns(filter, table_schema))
865            .collect::<Result<Vec<_>>>()?;
866
867        let result = self
868            .file_source
869            .try_pushdown_filters(remapped_filters, config)?;
870        match result.updated_node {
871            Some(new_file_source) => {
872                let mut new_file_scan_config = self.clone();
873                new_file_scan_config.file_source = new_file_source;
874                Ok(FilterPushdownPropagation {
875                    filters: result.filters,
876                    updated_node: Some(Arc::new(new_file_scan_config) as _),
877                })
878            }
879            None => {
880                // If the file source does not support filter pushdown, return the original config
881                Ok(FilterPushdownPropagation {
882                    filters: result.filters,
883                    updated_node: None,
884                })
885            }
886        }
887    }
888
889    fn try_pushdown_sort(
890        &self,
891        order: &[PhysicalSortExpr],
892    ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
893        // Delegate to FileSource to see if it can optimize for the requested ordering.
894        let pushdown_result = self
895            .file_source
896            .try_pushdown_sort(order, &self.eq_properties())?;
897
898        match pushdown_result {
899            SortOrderPushdownResult::Exact { inner } => {
900                Ok(SortOrderPushdownResult::Exact {
901                    inner: self.rebuild_with_source(inner, true, order)?,
902                })
903            }
904            SortOrderPushdownResult::Inexact { inner } => {
905                Ok(SortOrderPushdownResult::Inexact {
906                    inner: self.rebuild_with_source(inner, false, order)?,
907                })
908            }
909            SortOrderPushdownResult::Unsupported => {
910                Ok(SortOrderPushdownResult::Unsupported)
911            }
912        }
913    }
914
915    fn with_preserve_order(&self, preserve_order: bool) -> Option<Arc<dyn DataSource>> {
916        if self.preserve_order == preserve_order {
917            return Some(Arc::new(self.clone()));
918        }
919
920        let new_config = FileScanConfig {
921            preserve_order,
922            ..self.clone()
923        };
924        Some(Arc::new(new_config))
925    }
926}
927
928impl FileScanConfig {
929    /// Returns only the output orderings that are validated against actual
930    /// file group statistics.
931    ///
932    /// For example, individual files may be ordered by `col1 ASC`,
933    /// but if we have files with these min/max statistics in a single partition / file group:
934    ///
935    /// - file1: min(col1) = 10, max(col1) = 20
936    /// - file2: min(col1) = 5, max(col1) = 15
937    ///
938    /// Because reading file1 followed by file2 would produce out-of-order output (there is overlap
939    /// in the ranges), we cannot retain `col1 ASC` as a valid output ordering.
940    ///
941    /// Similarly this would not be a valid order (non-overlapping ranges but not ordered):
942    ///
943    /// - file1: min(col1) = 20, max(col1) = 30
944    /// - file2: min(col1) = 10, max(col1) = 15
945    ///
946    /// On the other hand if we had:
947    ///
948    /// - file1: min(col1) = 5, max(col1) = 15
949    /// - file2: min(col1) = 16, max(col1) = 25
950    ///
951    /// Then we know that reading file1 followed by file2 will produce ordered output,
952    /// so `col1 ASC` would be retained.
953    ///
954    /// Note that we are checking for ordering *within* *each* file group / partition,
955    /// files in different partitions are read independently and do not affect each other's ordering.
956    /// Merging of the multiple partition streams into a single ordered stream is handled
957    /// upstream e.g. by `SortPreservingMergeExec`.
958    fn validated_output_ordering(&self) -> Vec<LexOrdering> {
959        let schema = self.file_source.table_schema().table_schema();
960        validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
961    }
962
963    /// Get the file schema (schema of the files without partition columns)
964    pub fn file_schema(&self) -> &SchemaRef {
965        self.file_source.table_schema().file_schema()
966    }
967
968    /// Get the table partition columns
969    pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
970        self.file_source.table_schema().table_partition_cols()
971    }
972
973    /// Returns the unprojected table statistics, marking them as inexact if filters are present.
974    ///
975    /// When filters are pushed down (including pruning predicates and bloom filters),
976    /// we can't guarantee the statistics are exact because we don't know how many
977    /// rows will be filtered out.
978    pub fn statistics(&self) -> Statistics {
979        if self.file_source.filter().is_some() {
980            self.statistics.clone().to_inexact()
981        } else {
982            self.statistics.clone()
983        }
984    }
985
986    pub fn projected_schema(&self) -> Result<Arc<Schema>> {
987        let schema = self.file_source.table_schema().table_schema();
988        match self.file_source.projection() {
989            Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)),
990            None => Ok(Arc::clone(schema)),
991        }
992    }
993
994    fn add_filter_equivalence_info(
995        filter: &Arc<dyn PhysicalExpr>,
996        eq_properties: &mut EquivalenceProperties,
997        schema: &Schema,
998    ) -> Result<()> {
999        // Gather valid equality pairs from the filter expression
1000        let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
1001            // Ignore any binary expressions that reference non-existent columns in the current schema
1002            // (e.g. due to unnecessary projections being removed)
1003            reassign_expr_columns(Arc::clone(expr), schema)
1004                .ok()
1005                .and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
1006                    Some(expr) if expr.op() == &Operator::Eq => {
1007                        Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
1008                    }
1009                    _ => None,
1010                })
1011        });
1012
1013        for (lhs, rhs) in equal_pairs {
1014            eq_properties.add_equal_conditions(lhs, rhs)?
1015        }
1016
1017        Ok(())
1018    }
1019
1020    /// Returns whether newlines in values are supported.
1021    ///
1022    /// This method always returns `false`. The actual newlines_in_values setting
1023    /// has been moved to [`CsvSource`] and should be accessed via
1024    /// [`CsvSource::csv_options()`] instead.
1025    ///
1026    /// [`CsvSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html
1027    /// [`CsvSource::csv_options()`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html#method.csv_options
1028    #[deprecated(
1029        since = "52.0.0",
1030        note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1031    )]
1032    pub fn newlines_in_values(&self) -> bool {
1033        false
1034    }
1035
1036    #[deprecated(
1037        since = "52.0.0",
1038        note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1039    )]
1040    pub fn projected_constraints(&self) -> Constraints {
1041        let props = self.eq_properties();
1042        props.constraints().clone()
1043    }
1044
1045    #[deprecated(
1046        since = "52.0.0",
1047        note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1048    )]
1049    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
1050        #[expect(deprecated)]
1051        self.file_source.projection().as_ref().map(|p| {
1052            p.ordered_column_indices()
1053                .into_iter()
1054                .filter(|&i| i < self.file_schema().fields().len())
1055                .collect::<Vec<_>>()
1056        })
1057    }
1058
1059    /// Splits file groups into new groups based on statistics to enable efficient parallel processing.
1060    ///
1061    /// The method distributes files across a target number of partitions while ensuring
1062    /// files within each partition maintain sort order based on their min/max statistics.
1063    ///
1064    /// The algorithm works by:
1065    /// 1. Takes files sorted by minimum values
1066    /// 2. For each file:
1067    ///   - Finds eligible groups (empty or where file's min > group's last max)
1068    ///   - Selects the smallest eligible group
1069    ///   - Creates a new group if needed
1070    ///
1071    /// # Parameters
1072    /// * `table_schema`: Schema containing information about the columns
1073    /// * `file_groups`: The original file groups to split
1074    /// * `sort_order`: The lexicographical ordering to maintain within each group
1075    /// * `target_partitions`: The desired number of output partitions
1076    ///
1077    /// # Returns
1078    /// A new set of file groups, where files within each group are non-overlapping with respect to
1079    /// their min/max statistics and maintain the specified sort order.
1080    pub fn split_groups_by_statistics_with_target_partitions(
1081        table_schema: &SchemaRef,
1082        file_groups: &[FileGroup],
1083        sort_order: &LexOrdering,
1084        target_partitions: usize,
1085    ) -> Result<Vec<FileGroup>> {
1086        if target_partitions == 0 {
1087            return Err(internal_datafusion_err!(
1088                "target_partitions must be greater than 0"
1089            ));
1090        }
1091
1092        let flattened_files = file_groups
1093            .iter()
1094            .flat_map(FileGroup::iter)
1095            .collect::<Vec<_>>();
1096
1097        if flattened_files.is_empty() {
1098            return Ok(vec![]);
1099        }
1100
1101        let statistics = MinMaxStatistics::new_from_files(
1102            sort_order,
1103            table_schema,
1104            None,
1105            flattened_files.iter().copied(),
1106        )?;
1107
1108        let indices_sorted_by_min = statistics.min_values_sorted();
1109
1110        // Initialize with target_partitions empty groups
1111        let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
1112
1113        for (idx, min) in indices_sorted_by_min {
1114            if let Some((_, group)) = file_groups_indices
1115                .iter_mut()
1116                .enumerate()
1117                .filter(|(_, group)| {
1118                    group.is_empty()
1119                        || min
1120                            > statistics
1121                                .max(*group.last().expect("groups should not be empty"))
1122                })
1123                .min_by_key(|(_, group)| group.len())
1124            {
1125                group.push(idx);
1126            } else {
1127                // Create a new group if no existing group fits
1128                file_groups_indices.push(vec![idx]);
1129            }
1130        }
1131
1132        // Remove any empty groups
1133        file_groups_indices.retain(|group| !group.is_empty());
1134
1135        // Assemble indices back into groups of PartitionedFiles
1136        Ok(file_groups_indices
1137            .into_iter()
1138            .map(|file_group_indices| {
1139                FileGroup::new(
1140                    file_group_indices
1141                        .into_iter()
1142                        .map(|idx| flattened_files[idx].clone())
1143                        .collect(),
1144                )
1145            })
1146            .collect())
1147    }
1148
1149    /// Attempts to do a bin-packing on files into file groups, such that any two files
1150    /// in a file group are ordered and non-overlapping with respect to their statistics.
1151    /// It will produce the smallest number of file groups possible.
1152    pub fn split_groups_by_statistics(
1153        table_schema: &SchemaRef,
1154        file_groups: &[FileGroup],
1155        sort_order: &LexOrdering,
1156    ) -> Result<Vec<FileGroup>> {
1157        let flattened_files = file_groups
1158            .iter()
1159            .flat_map(FileGroup::iter)
1160            .collect::<Vec<_>>();
1161        // First Fit:
1162        // * Choose the first file group that a file can be placed into.
1163        // * If it fits into no existing file groups, create a new one.
1164        //
1165        // By sorting files by min values and then applying first-fit bin packing,
1166        // we can produce the smallest number of file groups such that
1167        // files within a group are in order and non-overlapping.
1168        //
1169        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
1170        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
1171
1172        if flattened_files.is_empty() {
1173            return Ok(vec![]);
1174        }
1175
1176        let statistics = MinMaxStatistics::new_from_files(
1177            sort_order,
1178            table_schema,
1179            None,
1180            flattened_files.iter().copied(),
1181        )
1182        .map_err(|e| {
1183            e.context("construct min/max statistics for split_groups_by_statistics")
1184        })?;
1185
1186        let indices_sorted_by_min = statistics.min_values_sorted();
1187        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1188
1189        for (idx, min) in indices_sorted_by_min {
1190            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1191                // If our file is non-overlapping and comes _after_ the last file,
1192                // it fits in this file group.
1193                min > statistics.max(
1194                    *group
1195                        .last()
1196                        .expect("groups should be nonempty at construction"),
1197                )
1198            });
1199            match file_group_to_insert {
1200                Some(group) => group.push(idx),
1201                None => file_groups_indices.push(vec![idx]),
1202            }
1203        }
1204
1205        // Assemble indices back into groups of PartitionedFiles
1206        Ok(file_groups_indices
1207            .into_iter()
1208            .map(|file_group_indices| {
1209                file_group_indices
1210                    .into_iter()
1211                    .map(|idx| flattened_files[idx].clone())
1212                    .collect()
1213            })
1214            .collect())
1215    }
1216
1217    /// Write the data_type based on file_source
1218    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1219        write!(f, ", file_type={}", self.file_source.file_type())?;
1220        self.file_source.fmt_extra(t, f)
1221    }
1222
1223    /// Returns the file_source
1224    pub fn file_source(&self) -> &Arc<dyn FileSource> {
1225        &self.file_source
1226    }
1227
1228    /// Helper: Rebuild FileScanConfig with new file source
1229    fn rebuild_with_source(
1230        &self,
1231        new_file_source: Arc<dyn FileSource>,
1232        is_exact: bool,
1233        order: &[PhysicalSortExpr],
1234    ) -> Result<Arc<dyn DataSource>> {
1235        let mut new_config = self.clone();
1236
1237        // Reverse file order (within each group) if the caller is requesting a reversal of this
1238        // scan's declared output ordering.
1239        //
1240        // Historically this function always reversed `file_groups` because it was only reached
1241        // via `FileSource::try_reverse_output` (where a reversal was the only supported
1242        // optimization).
1243        //
1244        // Now that `FileSource::try_pushdown_sort` is generic, we must not assume reversal: other
1245        // optimizations may become possible (e.g. already-sorted data, statistics-based file
1246        // reordering). Therefore we only reverse files when it is known to help satisfy the
1247        // requested ordering.
1248        let reverse_file_groups = if self.output_ordering.is_empty() {
1249            false
1250        } else if let Some(requested) = LexOrdering::new(order.iter().cloned()) {
1251            let projected_schema = self.projected_schema()?;
1252            let orderings = project_orderings(&self.output_ordering, &projected_schema);
1253            orderings
1254                .iter()
1255                .any(|ordering| ordering.is_reverse(&requested))
1256        } else {
1257            false
1258        };
1259
1260        if reverse_file_groups {
1261            new_config.file_groups = new_config
1262                .file_groups
1263                .into_iter()
1264                .map(|group| {
1265                    let mut files = group.into_inner();
1266                    files.reverse();
1267                    files.into()
1268                })
1269                .collect();
1270        }
1271
1272        new_config.file_source = new_file_source;
1273
1274        // Phase 1: Clear output_ordering for Inexact
1275        // (we're only reversing row groups, not guaranteeing perfect ordering)
1276        if !is_exact {
1277            new_config.output_ordering = vec![];
1278        }
1279
1280        Ok(Arc::new(new_config))
1281    }
1282}
1283
1284impl Debug for FileScanConfig {
1285    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1286        write!(f, "FileScanConfig {{")?;
1287        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1288
1289        write!(f, "statistics={:?}, ", self.statistics())?;
1290
1291        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1292        write!(f, "}}")
1293    }
1294}
1295
1296impl DisplayAs for FileScanConfig {
1297    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1298        let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
1299        let orderings = get_projected_output_ordering(self, &schema);
1300
1301        write!(f, "file_groups=")?;
1302        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1303
1304        if !schema.fields().is_empty() {
1305            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1306        }
1307
1308        if let Some(limit) = self.limit {
1309            write!(f, ", limit={limit}")?;
1310        }
1311
1312        display_orderings(f, &orderings)?;
1313
1314        if !self.constraints.is_empty() {
1315            write!(f, ", {}", self.constraints)?;
1316        }
1317
1318        Ok(())
1319    }
1320}
1321
1322/// Get the indices of columns in a projection if the projection is a simple
1323/// list of columns.
1324/// If there are any expressions other than columns, returns None.
1325fn ordered_column_indices_from_projection(
1326    projection: &ProjectionExprs,
1327) -> Option<Vec<usize>> {
1328    projection
1329        .expr_iter()
1330        .map(|e| {
1331            let index = e.as_any().downcast_ref::<Column>()?.index();
1332            Some(index)
1333        })
1334        .collect::<Option<Vec<usize>>>()
1335}
1336
1337/// Check whether a given ordering is valid for all file groups by verifying
1338/// that files within each group are sorted according to their min/max statistics.
1339///
1340/// For single-file (or empty) groups, the ordering is trivially valid.
1341/// For multi-file groups, we check that the min/max statistics for the sort
1342/// columns are in order and non-overlapping (or touching at boundaries).
1343///
1344/// `projection` maps projected column indices back to table-schema indices
1345/// when validating after projection; pass `None` when validating at
1346/// table-schema level.
1347fn is_ordering_valid_for_file_groups(
1348    file_groups: &[FileGroup],
1349    ordering: &LexOrdering,
1350    schema: &SchemaRef,
1351    projection: Option<&[usize]>,
1352) -> bool {
1353    file_groups.iter().all(|group| {
1354        if group.len() <= 1 {
1355            return true; // single-file groups are trivially sorted
1356        }
1357        match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
1358        {
1359            Ok(stats) => stats.is_sorted(),
1360            Err(_) => false, // can't prove sorted → reject
1361        }
1362    })
1363}
1364
1365/// Filters orderings to retain only those valid for all file groups,
1366/// verified via min/max statistics.
1367fn validate_orderings(
1368    orderings: &[LexOrdering],
1369    schema: &SchemaRef,
1370    file_groups: &[FileGroup],
1371    projection: Option<&[usize]>,
1372) -> Vec<LexOrdering> {
1373    orderings
1374        .iter()
1375        .filter(|ordering| {
1376            is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
1377        })
1378        .cloned()
1379        .collect()
1380}
1381
1382/// The various listing tables does not attempt to read all files
1383/// concurrently, instead they will read files in sequence within a
1384/// partition.  This is an important property as it allows plans to
1385/// run against 1000s of files and not try to open them all
1386/// concurrently.
1387///
1388/// However, it means if we assign more than one file to a partition
1389/// the output sort order will not be preserved as illustrated in the
1390/// following diagrams:
1391///
1392/// When only 1 file is assigned to each partition, each partition is
1393/// correctly sorted on `(A, B, C)`
1394///
1395/// ```text
1396/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
1397///   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
1398/// ┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
1399///   │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
1400/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
1401///   │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
1402/// ┃                                          │                    │                     ┃
1403///   │                   │ │                    │                    │                 │
1404/// ┃                                          │                    │                     ┃
1405///   │                   │ │                    │                    │                 │
1406/// ┃                                          │                    │                     ┃
1407///   │                   │ │                    │                    │                 │
1408/// ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1409///      DataFusion           DataFusion           DataFusion           DataFusion
1410/// ┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
1411///  ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1412///
1413///                                      DataSourceExec
1414/// ```
1415///
1416/// However, when more than 1 file is assigned to each partition, each
1417/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
1418/// file is scanned, the same values for A, B and C can be repeated in
1419/// the same sorted stream
1420///
1421///```text
1422/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1423///   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1424/// ┃   ┌───────────────┐     ┌──────────────┐ │
1425///   │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
1426/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1427///   │ └───────────────┘ │ │ └──────────────┘   ┃
1428/// ┃   ┌───────────────┐     ┌──────────────┐ │
1429///   │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
1430/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1431///   │ └───────────────┘ │ │ └──────────────┘   ┃
1432/// ┃                                          │
1433///   │                   │ │                    ┃
1434/// ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
1435///      DataFusion           DataFusion         ┃
1436/// ┃    Partition 1          Partition 2
1437///  ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
1438///
1439///              DataSourceExec
1440/// ```
1441fn get_projected_output_ordering(
1442    base_config: &FileScanConfig,
1443    projected_schema: &SchemaRef,
1444) -> Vec<LexOrdering> {
1445    let projected_orderings =
1446        project_orderings(&base_config.output_ordering, projected_schema);
1447
1448    let indices = base_config
1449        .file_source
1450        .projection()
1451        .as_ref()
1452        .map(|p| ordered_column_indices_from_projection(p));
1453
1454    match indices {
1455        Some(Some(indices)) => {
1456            // Simple column projection — validate with statistics
1457            validate_orderings(
1458                &projected_orderings,
1459                projected_schema,
1460                &base_config.file_groups,
1461                Some(indices.as_slice()),
1462            )
1463        }
1464        None => {
1465            // No projection — validate with statistics (no remapping needed)
1466            validate_orderings(
1467                &projected_orderings,
1468                projected_schema,
1469                &base_config.file_groups,
1470                None,
1471            )
1472        }
1473        Some(None) => {
1474            // Complex projection (expressions, not simple columns) — can't
1475            // determine column indices for statistics. Still valid if all
1476            // file groups have at most one file.
1477            if base_config.file_groups.iter().all(|g| g.len() <= 1) {
1478                projected_orderings
1479            } else {
1480                debug!(
1481                    "Skipping specified output orderings. \
1482                     Some file groups couldn't be determined to be sorted: {:?}",
1483                    base_config.file_groups
1484                );
1485                vec![]
1486            }
1487        }
1488    }
1489}
1490
1491/// Convert type to a type suitable for use as a `ListingTable`
1492/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1493/// a reasonable trade off between a reasonable number of partition
1494/// values and space efficiency.
1495///
1496/// This use this to specify types for partition columns. However
1497/// you MAY also choose not to dictionary-encode the data or to use a
1498/// different dictionary type.
1499///
1500/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1501pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1502    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1503}
1504
1505/// Convert a [`ScalarValue`] of partition columns to a type, as
1506/// described in the documentation of [`wrap_partition_type_in_dict`],
1507/// which can wrap the types.
1508pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1509    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1510}
1511
1512#[cfg(test)]
1513mod tests {
1514    use std::collections::HashMap;
1515
1516    use super::*;
1517    use crate::TableSchema;
1518    use crate::test_util::col;
1519    use crate::{
1520        generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1521        verify_sort_integrity,
1522    };
1523
1524    use arrow::datatypes::Field;
1525    use datafusion_common::stats::Precision;
1526    use datafusion_common::{ColumnStatistics, internal_err};
1527    use datafusion_expr::{Operator, SortExpr};
1528    use datafusion_physical_expr::create_physical_sort_expr;
1529    use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
1530    use datafusion_physical_expr::projection::ProjectionExpr;
1531    use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1532
1533    #[derive(Clone)]
1534    struct InexactSortPushdownSource {
1535        metrics: ExecutionPlanMetricsSet,
1536        table_schema: TableSchema,
1537    }
1538
1539    impl InexactSortPushdownSource {
1540        fn new(table_schema: TableSchema) -> Self {
1541            Self {
1542                metrics: ExecutionPlanMetricsSet::new(),
1543                table_schema,
1544            }
1545        }
1546    }
1547
1548    impl FileSource for InexactSortPushdownSource {
1549        fn create_file_opener(
1550            &self,
1551            _object_store: Arc<dyn object_store::ObjectStore>,
1552            _base_config: &FileScanConfig,
1553            _partition: usize,
1554        ) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
1555            unimplemented!()
1556        }
1557
1558        fn as_any(&self) -> &dyn Any {
1559            self
1560        }
1561
1562        fn table_schema(&self) -> &TableSchema {
1563            &self.table_schema
1564        }
1565
1566        fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
1567            Arc::new(self.clone())
1568        }
1569
1570        fn metrics(&self) -> &ExecutionPlanMetricsSet {
1571            &self.metrics
1572        }
1573
1574        fn file_type(&self) -> &str {
1575            "mock"
1576        }
1577
1578        fn try_pushdown_sort(
1579            &self,
1580            _order: &[PhysicalSortExpr],
1581            _eq_properties: &EquivalenceProperties,
1582        ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
1583            Ok(SortOrderPushdownResult::Inexact {
1584                inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
1585            })
1586        }
1587    }
1588
1589    #[test]
1590    fn physical_plan_config_no_projection_tab_cols_as_field() {
1591        let file_schema = aggr_test_schema();
1592
1593        // make a table_partition_col as a field
1594        let table_partition_col =
1595            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1596                .with_metadata(HashMap::from_iter(vec![(
1597                    "key_whatever".to_owned(),
1598                    "value_whatever".to_owned(),
1599                )]));
1600
1601        let conf = config_for_projection(
1602            Arc::clone(&file_schema),
1603            None,
1604            Statistics::new_unknown(&file_schema),
1605            vec![table_partition_col.clone()],
1606        );
1607
1608        // verify the proj_schema includes the last column and exactly the same the field it is defined
1609        let proj_schema = conf.projected_schema().unwrap();
1610        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1611        assert_eq!(
1612            *proj_schema.field(file_schema.fields().len()),
1613            table_partition_col,
1614            "partition columns are the last columns and ust have all values defined in created field"
1615        );
1616    }
1617
1618    #[test]
1619    fn test_split_groups_by_statistics() -> Result<()> {
1620        use chrono::TimeZone;
1621        use datafusion_common::DFSchema;
1622        use datafusion_expr::execution_props::ExecutionProps;
1623        use object_store::{ObjectMeta, path::Path};
1624
1625        struct File {
1626            name: &'static str,
1627            date: &'static str,
1628            statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1629        }
1630        impl File {
1631            fn new(
1632                name: &'static str,
1633                date: &'static str,
1634                statistics: Vec<Option<(f64, f64)>>,
1635            ) -> Self {
1636                Self::new_nullable(
1637                    name,
1638                    date,
1639                    statistics
1640                        .into_iter()
1641                        .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1642                        .collect(),
1643                )
1644            }
1645
1646            fn new_nullable(
1647                name: &'static str,
1648                date: &'static str,
1649                statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1650            ) -> Self {
1651                Self {
1652                    name,
1653                    date,
1654                    statistics,
1655                }
1656            }
1657        }
1658
1659        struct TestCase {
1660            name: &'static str,
1661            file_schema: Schema,
1662            files: Vec<File>,
1663            sort: Vec<SortExpr>,
1664            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1665        }
1666
1667        use datafusion_expr::col;
1668        let cases = vec![
1669            TestCase {
1670                name: "test sort",
1671                file_schema: Schema::new(vec![Field::new(
1672                    "value".to_string(),
1673                    DataType::Float64,
1674                    false,
1675                )]),
1676                files: vec![
1677                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1678                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1679                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1680                ],
1681                sort: vec![col("value").sort(true, false)],
1682                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1683            },
1684            // same input but file '2' is in the middle
1685            // test that we still order correctly
1686            TestCase {
1687                name: "test sort with files ordered differently",
1688                file_schema: Schema::new(vec![Field::new(
1689                    "value".to_string(),
1690                    DataType::Float64,
1691                    false,
1692                )]),
1693                files: vec![
1694                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1695                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1696                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1697                ],
1698                sort: vec![col("value").sort(true, false)],
1699                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1700            },
1701            TestCase {
1702                name: "reverse sort",
1703                file_schema: Schema::new(vec![Field::new(
1704                    "value".to_string(),
1705                    DataType::Float64,
1706                    false,
1707                )]),
1708                files: vec![
1709                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1710                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1711                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1712                ],
1713                sort: vec![col("value").sort(false, true)],
1714                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1715            },
1716            TestCase {
1717                name: "nullable sort columns, nulls last",
1718                file_schema: Schema::new(vec![Field::new(
1719                    "value".to_string(),
1720                    DataType::Float64,
1721                    true,
1722                )]),
1723                files: vec![
1724                    File::new_nullable(
1725                        "0",
1726                        "2023-01-01",
1727                        vec![Some((Some(0.00), Some(0.49)))],
1728                    ),
1729                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1730                    File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1731                ],
1732                sort: vec![col("value").sort(true, false)],
1733                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1734            },
1735            TestCase {
1736                name: "nullable sort columns, nulls first",
1737                file_schema: Schema::new(vec![Field::new(
1738                    "value".to_string(),
1739                    DataType::Float64,
1740                    true,
1741                )]),
1742                files: vec![
1743                    File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1744                    File::new_nullable(
1745                        "1",
1746                        "2023-01-01",
1747                        vec![Some((Some(0.50), Some(1.00)))],
1748                    ),
1749                    File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1750                ],
1751                sort: vec![col("value").sort(true, true)],
1752                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1753            },
1754            TestCase {
1755                name: "all three non-overlapping",
1756                file_schema: Schema::new(vec![Field::new(
1757                    "value".to_string(),
1758                    DataType::Float64,
1759                    false,
1760                )]),
1761                files: vec![
1762                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1763                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1764                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1765                ],
1766                sort: vec![col("value").sort(true, false)],
1767                expected_result: Ok(vec![vec!["0", "1", "2"]]),
1768            },
1769            TestCase {
1770                name: "all three overlapping",
1771                file_schema: Schema::new(vec![Field::new(
1772                    "value".to_string(),
1773                    DataType::Float64,
1774                    false,
1775                )]),
1776                files: vec![
1777                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1778                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1779                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1780                ],
1781                sort: vec![col("value").sort(true, false)],
1782                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1783            },
1784            TestCase {
1785                name: "empty input",
1786                file_schema: Schema::new(vec![Field::new(
1787                    "value".to_string(),
1788                    DataType::Float64,
1789                    false,
1790                )]),
1791                files: vec![],
1792                sort: vec![col("value").sort(true, false)],
1793                expected_result: Ok(vec![]),
1794            },
1795            TestCase {
1796                name: "one file missing statistics",
1797                file_schema: Schema::new(vec![Field::new(
1798                    "value".to_string(),
1799                    DataType::Float64,
1800                    false,
1801                )]),
1802                files: vec![
1803                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1804                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1805                    File::new("2", "2023-01-02", vec![None]),
1806                ],
1807                sort: vec![col("value").sort(true, false)],
1808                expected_result: Err(
1809                    "construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
1810                ),
1811            },
1812        ];
1813
1814        for case in cases {
1815            let table_schema = Arc::new(Schema::new(
1816                case.file_schema
1817                    .fields()
1818                    .clone()
1819                    .into_iter()
1820                    .cloned()
1821                    .chain(Some(Arc::new(Field::new(
1822                        "date".to_string(),
1823                        DataType::Utf8,
1824                        false,
1825                    ))))
1826                    .collect::<Vec<_>>(),
1827            ));
1828            let Some(sort_order) = LexOrdering::new(
1829                case.sort
1830                    .into_iter()
1831                    .map(|expr| {
1832                        create_physical_sort_expr(
1833                            &expr,
1834                            &DFSchema::try_from(Arc::clone(&table_schema))?,
1835                            &ExecutionProps::default(),
1836                        )
1837                    })
1838                    .collect::<Result<Vec<_>>>()?,
1839            ) else {
1840                return internal_err!("This test should always use an ordering");
1841            };
1842
1843            let partitioned_files = FileGroup::new(
1844                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
1845            );
1846            let result = FileScanConfig::split_groups_by_statistics(
1847                &table_schema,
1848                std::slice::from_ref(&partitioned_files),
1849                &sort_order,
1850            );
1851            let results_by_name = result
1852                .as_ref()
1853                .map(|file_groups| {
1854                    file_groups
1855                        .iter()
1856                        .map(|file_group| {
1857                            file_group
1858                                .iter()
1859                                .map(|file| {
1860                                    partitioned_files
1861                                        .iter()
1862                                        .find_map(|f| {
1863                                            if f.object_meta == file.object_meta {
1864                                                Some(
1865                                                    f.object_meta
1866                                                        .location
1867                                                        .as_ref()
1868                                                        .rsplit('/')
1869                                                        .next()
1870                                                        .unwrap()
1871                                                        .trim_end_matches(".parquet"),
1872                                                )
1873                                            } else {
1874                                                None
1875                                            }
1876                                        })
1877                                        .unwrap()
1878                                })
1879                                .collect::<Vec<_>>()
1880                        })
1881                        .collect::<Vec<_>>()
1882                })
1883                .map_err(|e| e.strip_backtrace().leak() as &'static str);
1884
1885            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1886        }
1887
1888        return Ok(());
1889
1890        impl From<File> for PartitionedFile {
1891            fn from(file: File) -> Self {
1892                let object_meta = ObjectMeta {
1893                    location: Path::from(format!(
1894                        "data/date={}/{}.parquet",
1895                        file.date, file.name
1896                    )),
1897                    last_modified: chrono::Utc.timestamp_nanos(0),
1898                    size: 0,
1899                    e_tag: None,
1900                    version: None,
1901                };
1902                let statistics = Arc::new(Statistics {
1903                    num_rows: Precision::Absent,
1904                    total_byte_size: Precision::Absent,
1905                    column_statistics: file
1906                        .statistics
1907                        .into_iter()
1908                        .map(|stats| {
1909                            stats
1910                                .map(|(min, max)| ColumnStatistics {
1911                                    min_value: Precision::Exact(ScalarValue::Float64(
1912                                        min,
1913                                    )),
1914                                    max_value: Precision::Exact(ScalarValue::Float64(
1915                                        max,
1916                                    )),
1917                                    ..Default::default()
1918                                })
1919                                .unwrap_or_default()
1920                        })
1921                        .collect::<Vec<_>>(),
1922                });
1923                PartitionedFile::new_from_meta(object_meta)
1924                    .with_partition_values(vec![ScalarValue::from(file.date)])
1925                    .with_statistics(statistics)
1926            }
1927        }
1928    }
1929
1930    // sets default for configs that play no role in projections
1931    fn config_for_projection(
1932        file_schema: SchemaRef,
1933        projection: Option<Vec<usize>>,
1934        statistics: Statistics,
1935        table_partition_cols: Vec<Field>,
1936    ) -> FileScanConfig {
1937        let table_schema = TableSchema::new(
1938            file_schema,
1939            table_partition_cols.into_iter().map(Arc::new).collect(),
1940        );
1941        FileScanConfigBuilder::new(
1942            ObjectStoreUrl::parse("test:///").unwrap(),
1943            Arc::new(MockSource::new(table_schema.clone())),
1944        )
1945        .with_projection_indices(projection)
1946        .unwrap()
1947        .with_statistics(statistics)
1948        .build()
1949    }
1950
1951    #[test]
1952    fn test_file_scan_config_builder() {
1953        let file_schema = aggr_test_schema();
1954        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1955
1956        let table_schema = TableSchema::new(
1957            Arc::clone(&file_schema),
1958            vec![Arc::new(Field::new(
1959                "date",
1960                wrap_partition_type_in_dict(DataType::Utf8),
1961                false,
1962            ))],
1963        );
1964
1965        let file_source: Arc<dyn FileSource> =
1966            Arc::new(MockSource::new(table_schema.clone()));
1967
1968        // Create a builder with required parameters
1969        let builder = FileScanConfigBuilder::new(
1970            object_store_url.clone(),
1971            Arc::clone(&file_source),
1972        );
1973
1974        // Build with various configurations
1975        let config = builder
1976            .with_limit(Some(1000))
1977            .with_projection_indices(Some(vec![0, 1]))
1978            .unwrap()
1979            .with_statistics(Statistics::new_unknown(&file_schema))
1980            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
1981                "test.parquet".to_string(),
1982                1024,
1983            )])])
1984            .with_output_ordering(vec![
1985                [PhysicalSortExpr::new_default(Arc::new(Column::new(
1986                    "date", 0,
1987                )))]
1988                .into(),
1989            ])
1990            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1991            .build();
1992
1993        // Verify the built config has all the expected values
1994        assert_eq!(config.object_store_url, object_store_url);
1995        assert_eq!(*config.file_schema(), file_schema);
1996        assert_eq!(config.limit, Some(1000));
1997        assert_eq!(
1998            config
1999                .file_source
2000                .projection()
2001                .as_ref()
2002                .map(|p| p.column_indices()),
2003            Some(vec![0, 1])
2004        );
2005        assert_eq!(config.table_partition_cols().len(), 1);
2006        assert_eq!(config.table_partition_cols()[0].name(), "date");
2007        assert_eq!(config.file_groups.len(), 1);
2008        assert_eq!(config.file_groups[0].len(), 1);
2009        assert_eq!(
2010            config.file_groups[0][0].object_meta.location.as_ref(),
2011            "test.parquet"
2012        );
2013        assert_eq!(
2014            config.file_compression_type,
2015            FileCompressionType::UNCOMPRESSED
2016        );
2017        assert_eq!(config.output_ordering.len(), 1);
2018    }
2019
2020    #[test]
2021    fn equivalence_properties_after_schema_change() {
2022        let file_schema = aggr_test_schema();
2023        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2024
2025        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2026
2027        // Create a file source with a filter
2028        let file_source: Arc<dyn FileSource> = Arc::new(
2029            MockSource::new(table_schema.clone()).with_filter(Arc::new(BinaryExpr::new(
2030                col("c2", &file_schema).unwrap(),
2031                Operator::Eq,
2032                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2033            ))),
2034        );
2035
2036        let config = FileScanConfigBuilder::new(
2037            object_store_url.clone(),
2038            Arc::clone(&file_source),
2039        )
2040        .with_projection_indices(Some(vec![0, 1, 2]))
2041        .unwrap()
2042        .build();
2043
2044        // Simulate projection being updated. Since the filter has already been pushed down,
2045        // the new projection won't include the filtered column.
2046        let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
2047            col("c1", &file_schema).unwrap(),
2048            "c1",
2049        )]);
2050        let data_source = config
2051            .try_swapping_with_projection(&exprs)
2052            .unwrap()
2053            .unwrap();
2054
2055        // Gather the equivalence properties from the new data source. There should
2056        // be no equivalence class for column c2 since it was removed by the projection.
2057        let eq_properties = data_source.eq_properties();
2058        let eq_group = eq_properties.eq_group();
2059
2060        for class in eq_group.iter() {
2061            for expr in class.iter() {
2062                if let Some(col) = expr.as_any().downcast_ref::<Column>() {
2063                    assert_ne!(
2064                        col.name(),
2065                        "c2",
2066                        "c2 should not be present in any equivalence class"
2067                    );
2068                }
2069            }
2070        }
2071    }
2072
2073    #[test]
2074    fn test_file_scan_config_builder_defaults() {
2075        let file_schema = aggr_test_schema();
2076        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2077
2078        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2079
2080        let file_source: Arc<dyn FileSource> =
2081            Arc::new(MockSource::new(table_schema.clone()));
2082
2083        // Create a builder with only required parameters and build without any additional configurations
2084        let config = FileScanConfigBuilder::new(
2085            object_store_url.clone(),
2086            Arc::clone(&file_source),
2087        )
2088        .build();
2089
2090        // Verify default values
2091        assert_eq!(config.object_store_url, object_store_url);
2092        assert_eq!(*config.file_schema(), file_schema);
2093        assert_eq!(config.limit, None);
2094        // When no projection is specified, the file source should have an unprojected projection
2095        // (i.e., all columns)
2096        let expected_projection: Vec<usize> = (0..file_schema.fields().len()).collect();
2097        assert_eq!(
2098            config
2099                .file_source
2100                .projection()
2101                .as_ref()
2102                .map(|p| p.column_indices()),
2103            Some(expected_projection)
2104        );
2105        assert!(config.table_partition_cols().is_empty());
2106        assert!(config.file_groups.is_empty());
2107        assert_eq!(
2108            config.file_compression_type,
2109            FileCompressionType::UNCOMPRESSED
2110        );
2111        assert!(config.output_ordering.is_empty());
2112        assert!(config.constraints.is_empty());
2113
2114        // Verify statistics are set to unknown
2115        assert_eq!(config.statistics().num_rows, Precision::Absent);
2116        assert_eq!(config.statistics().total_byte_size, Precision::Absent);
2117        assert_eq!(
2118            config.statistics().column_statistics.len(),
2119            file_schema.fields().len()
2120        );
2121        for stat in config.statistics().column_statistics {
2122            assert_eq!(stat.distinct_count, Precision::Absent);
2123            assert_eq!(stat.min_value, Precision::Absent);
2124            assert_eq!(stat.max_value, Precision::Absent);
2125            assert_eq!(stat.null_count, Precision::Absent);
2126        }
2127    }
2128
2129    #[test]
2130    fn test_file_scan_config_builder_new_from() {
2131        let schema = aggr_test_schema();
2132        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2133        let partition_cols = vec![Field::new(
2134            "date",
2135            wrap_partition_type_in_dict(DataType::Utf8),
2136            false,
2137        )];
2138        let file = PartitionedFile::new("test_file.parquet", 100);
2139
2140        let table_schema = TableSchema::new(
2141            Arc::clone(&schema),
2142            partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2143        );
2144
2145        let file_source: Arc<dyn FileSource> =
2146            Arc::new(MockSource::new(table_schema.clone()));
2147
2148        // Create a config with non-default values
2149        let original_config = FileScanConfigBuilder::new(
2150            object_store_url.clone(),
2151            Arc::clone(&file_source),
2152        )
2153        .with_projection_indices(Some(vec![0, 2]))
2154        .unwrap()
2155        .with_limit(Some(10))
2156        .with_file(file.clone())
2157        .with_constraints(Constraints::default())
2158        .build();
2159
2160        // Create a new builder from the config
2161        let new_builder = FileScanConfigBuilder::from(original_config);
2162
2163        // Build a new config from this builder
2164        let new_config = new_builder.build();
2165
2166        // Verify properties match
2167        let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2168        assert_eq!(new_config.object_store_url, object_store_url);
2169        assert_eq!(*new_config.file_schema(), schema);
2170        assert_eq!(
2171            new_config
2172                .file_source
2173                .projection()
2174                .as_ref()
2175                .map(|p| p.column_indices()),
2176            Some(vec![0, 2])
2177        );
2178        assert_eq!(new_config.limit, Some(10));
2179        assert_eq!(*new_config.table_partition_cols(), partition_cols);
2180        assert_eq!(new_config.file_groups.len(), 1);
2181        assert_eq!(new_config.file_groups[0].len(), 1);
2182        assert_eq!(
2183            new_config.file_groups[0][0].object_meta.location.as_ref(),
2184            "test_file.parquet"
2185        );
2186        assert_eq!(new_config.constraints, Constraints::default());
2187    }
2188
2189    #[test]
2190    fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2191        use datafusion_common::DFSchema;
2192        use datafusion_expr::{col, execution_props::ExecutionProps};
2193
2194        let schema = Arc::new(Schema::new(vec![Field::new(
2195            "value",
2196            DataType::Float64,
2197            false,
2198        )]));
2199
2200        // Setup sort expression
2201        let exec_props = ExecutionProps::new();
2202        let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2203        let sort_expr = [col("value").sort(true, false)];
2204        let sort_ordering = sort_expr
2205            .map(|expr| {
2206                create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2207            })
2208            .into();
2209
2210        // Test case parameters
2211        struct TestCase {
2212            name: String,
2213            file_count: usize,
2214            overlap_factor: f64,
2215            target_partitions: usize,
2216            expected_partition_count: usize,
2217        }
2218
2219        let test_cases = vec![
2220            // Basic cases
2221            TestCase {
2222                name: "no_overlap_10_files_4_partitions".to_string(),
2223                file_count: 10,
2224                overlap_factor: 0.0,
2225                target_partitions: 4,
2226                expected_partition_count: 4,
2227            },
2228            TestCase {
2229                name: "medium_overlap_20_files_5_partitions".to_string(),
2230                file_count: 20,
2231                overlap_factor: 0.5,
2232                target_partitions: 5,
2233                expected_partition_count: 5,
2234            },
2235            TestCase {
2236                name: "high_overlap_30_files_3_partitions".to_string(),
2237                file_count: 30,
2238                overlap_factor: 0.8,
2239                target_partitions: 3,
2240                expected_partition_count: 7,
2241            },
2242            // Edge cases
2243            TestCase {
2244                name: "fewer_files_than_partitions".to_string(),
2245                file_count: 3,
2246                overlap_factor: 0.0,
2247                target_partitions: 10,
2248                expected_partition_count: 3, // Should only create as many partitions as files
2249            },
2250            TestCase {
2251                name: "single_file".to_string(),
2252                file_count: 1,
2253                overlap_factor: 0.0,
2254                target_partitions: 5,
2255                expected_partition_count: 1, // Should create only one partition
2256            },
2257            TestCase {
2258                name: "empty_files".to_string(),
2259                file_count: 0,
2260                overlap_factor: 0.0,
2261                target_partitions: 3,
2262                expected_partition_count: 0, // Empty result for empty input
2263            },
2264        ];
2265
2266        for case in test_cases {
2267            println!("Running test case: {}", case.name);
2268
2269            // Generate files using bench utility function
2270            let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2271
2272            // Call the function under test
2273            let result =
2274                FileScanConfig::split_groups_by_statistics_with_target_partitions(
2275                    &schema,
2276                    &file_groups,
2277                    &sort_ordering,
2278                    case.target_partitions,
2279                )?;
2280
2281            // Verify results
2282            println!(
2283                "Created {} partitions (target was {})",
2284                result.len(),
2285                case.target_partitions
2286            );
2287
2288            // Check partition count
2289            assert_eq!(
2290                result.len(),
2291                case.expected_partition_count,
2292                "Case '{}': Unexpected partition count",
2293                case.name
2294            );
2295
2296            // Verify sort integrity
2297            assert!(
2298                verify_sort_integrity(&result),
2299                "Case '{}': Files within partitions are not properly ordered",
2300                case.name
2301            );
2302
2303            // Distribution check for partitions
2304            if case.file_count > 1 && case.expected_partition_count > 1 {
2305                let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2306                let max_size = *group_sizes.iter().max().unwrap();
2307                let min_size = *group_sizes.iter().min().unwrap();
2308
2309                // Check partition balancing - difference shouldn't be extreme
2310                let avg_files_per_partition =
2311                    case.file_count as f64 / case.expected_partition_count as f64;
2312                assert!(
2313                    (max_size as f64) < 2.0 * avg_files_per_partition,
2314                    "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2315                    case.name,
2316                    max_size,
2317                    avg_files_per_partition
2318                );
2319
2320                println!("Distribution - min files: {min_size}, max files: {max_size}");
2321            }
2322        }
2323
2324        // Test error case: zero target partitions
2325        let empty_groups: Vec<FileGroup> = vec![];
2326        let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2327            &schema,
2328            &empty_groups,
2329            &sort_ordering,
2330            0,
2331        )
2332        .unwrap_err();
2333
2334        assert!(
2335            err.to_string()
2336                .contains("target_partitions must be greater than 0"),
2337            "Expected error for zero target partitions"
2338        );
2339
2340        Ok(())
2341    }
2342
2343    #[test]
2344    fn test_partition_statistics_projection() {
2345        // This test verifies that partition_statistics applies projection correctly.
2346        // The old implementation had a bug where it returned file group statistics
2347        // without applying the projection, returning all column statistics instead
2348        // of just the projected ones.
2349
2350        use crate::source::DataSourceExec;
2351        use datafusion_physical_plan::ExecutionPlan;
2352
2353        // Create a schema with 4 columns
2354        let schema = Arc::new(Schema::new(vec![
2355            Field::new("col0", DataType::Int32, false),
2356            Field::new("col1", DataType::Int32, false),
2357            Field::new("col2", DataType::Int32, false),
2358            Field::new("col3", DataType::Int32, false),
2359        ]));
2360
2361        // Create statistics for all 4 columns
2362        let file_group_stats = Statistics {
2363            num_rows: Precision::Exact(100),
2364            total_byte_size: Precision::Exact(1024),
2365            column_statistics: vec![
2366                ColumnStatistics {
2367                    null_count: Precision::Exact(0),
2368                    ..ColumnStatistics::new_unknown()
2369                },
2370                ColumnStatistics {
2371                    null_count: Precision::Exact(5),
2372                    ..ColumnStatistics::new_unknown()
2373                },
2374                ColumnStatistics {
2375                    null_count: Precision::Exact(10),
2376                    ..ColumnStatistics::new_unknown()
2377                },
2378                ColumnStatistics {
2379                    null_count: Precision::Exact(15),
2380                    ..ColumnStatistics::new_unknown()
2381                },
2382            ],
2383        };
2384
2385        // Create a file group with statistics
2386        let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2387            .with_statistics(Arc::new(file_group_stats));
2388
2389        let table_schema = TableSchema::new(Arc::clone(&schema), vec![]);
2390
2391        // Create a FileScanConfig with projection: only keep columns 0 and 2
2392        let config = FileScanConfigBuilder::new(
2393            ObjectStoreUrl::parse("test:///").unwrap(),
2394            Arc::new(MockSource::new(table_schema.clone())),
2395        )
2396        .with_projection_indices(Some(vec![0, 2]))
2397        .unwrap() // Only project columns 0 and 2
2398        .with_file_groups(vec![file_group])
2399        .build();
2400
2401        // Create a DataSourceExec from the config
2402        let exec = DataSourceExec::from_data_source(config);
2403
2404        // Get statistics for partition 0
2405        let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2406
2407        // Verify that only 2 columns are in the statistics (the projected ones)
2408        assert_eq!(
2409            partition_stats.column_statistics.len(),
2410            2,
2411            "Expected 2 column statistics (projected), but got {}",
2412            partition_stats.column_statistics.len()
2413        );
2414
2415        // Verify the column statistics are for columns 0 and 2
2416        assert_eq!(
2417            partition_stats.column_statistics[0].null_count,
2418            Precision::Exact(0),
2419            "First projected column should be col0 with 0 nulls"
2420        );
2421        assert_eq!(
2422            partition_stats.column_statistics[1].null_count,
2423            Precision::Exact(10),
2424            "Second projected column should be col2 with 10 nulls"
2425        );
2426
2427        // Verify row count and byte size
2428        assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2429        assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
2430    }
2431
2432    #[test]
2433    fn test_output_partitioning_not_partitioned_by_file_group() {
2434        let file_schema = aggr_test_schema();
2435        let partition_col =
2436            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), false);
2437
2438        let config = config_for_projection(
2439            Arc::clone(&file_schema),
2440            None,
2441            Statistics::new_unknown(&file_schema),
2442            vec![partition_col],
2443        );
2444
2445        // partitioned_by_file_group defaults to false
2446        let partitioning = config.output_partitioning();
2447        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2448    }
2449
2450    #[test]
2451    fn test_output_partitioning_no_partition_columns() {
2452        let file_schema = aggr_test_schema();
2453        let mut config = config_for_projection(
2454            Arc::clone(&file_schema),
2455            None,
2456            Statistics::new_unknown(&file_schema),
2457            vec![], // No partition columns
2458        );
2459        config.partitioned_by_file_group = true;
2460
2461        let partitioning = config.output_partitioning();
2462        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2463    }
2464
2465    #[test]
2466    fn test_output_partitioning_with_partition_columns() {
2467        let file_schema = aggr_test_schema();
2468
2469        // Test single partition column
2470        let single_partition_col = vec![Field::new(
2471            "date",
2472            wrap_partition_type_in_dict(DataType::Utf8),
2473            false,
2474        )];
2475
2476        let mut config = config_for_projection(
2477            Arc::clone(&file_schema),
2478            None,
2479            Statistics::new_unknown(&file_schema),
2480            single_partition_col,
2481        );
2482        config.partitioned_by_file_group = true;
2483        config.file_groups = vec![
2484            FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2485            FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2486            FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
2487        ];
2488
2489        let partitioning = config.output_partitioning();
2490        match partitioning {
2491            Partitioning::Hash(exprs, num_partitions) => {
2492                assert_eq!(num_partitions, 3);
2493                assert_eq!(exprs.len(), 1);
2494                assert_eq!(
2495                    exprs[0].as_any().downcast_ref::<Column>().unwrap().name(),
2496                    "date"
2497                );
2498            }
2499            _ => panic!("Expected Hash partitioning"),
2500        }
2501
2502        // Test multiple partition columns
2503        let multiple_partition_cols = vec![
2504            Field::new("year", wrap_partition_type_in_dict(DataType::Utf8), false),
2505            Field::new("month", wrap_partition_type_in_dict(DataType::Utf8), false),
2506        ];
2507
2508        config = config_for_projection(
2509            Arc::clone(&file_schema),
2510            None,
2511            Statistics::new_unknown(&file_schema),
2512            multiple_partition_cols,
2513        );
2514        config.partitioned_by_file_group = true;
2515        config.file_groups = vec![
2516            FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2517            FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2518        ];
2519
2520        let partitioning = config.output_partitioning();
2521        match partitioning {
2522            Partitioning::Hash(exprs, num_partitions) => {
2523                assert_eq!(num_partitions, 2);
2524                assert_eq!(exprs.len(), 2);
2525                let col_names: Vec<_> = exprs
2526                    .iter()
2527                    .map(|e| e.as_any().downcast_ref::<Column>().unwrap().name())
2528                    .collect();
2529                assert_eq!(col_names, vec!["year", "month"]);
2530            }
2531            _ => panic!("Expected Hash partitioning"),
2532        }
2533    }
2534
2535    #[test]
2536    fn try_pushdown_sort_reverses_file_groups_only_when_requested_is_reverse()
2537    -> Result<()> {
2538        let file_schema =
2539            Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
2540
2541        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2542        let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
2543
2544        let file_groups = vec![FileGroup::new(vec![
2545            PartitionedFile::new("file1", 1),
2546            PartitionedFile::new("file2", 1),
2547        ])];
2548
2549        let sort_expr_asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2550        let config =
2551            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2552                .with_file_groups(file_groups)
2553                .with_output_ordering(vec![
2554                    LexOrdering::new(vec![sort_expr_asc.clone()]).unwrap(),
2555                ])
2556                .build();
2557
2558        let requested_asc = vec![sort_expr_asc.clone()];
2559        let result = config.try_pushdown_sort(&requested_asc)?;
2560        let SortOrderPushdownResult::Inexact { inner } = result else {
2561            panic!("Expected Inexact result");
2562        };
2563        let pushed_config = inner
2564            .as_any()
2565            .downcast_ref::<FileScanConfig>()
2566            .expect("Expected FileScanConfig");
2567        let pushed_files = pushed_config.file_groups[0].files();
2568        assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file1");
2569        assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file2");
2570
2571        let requested_desc = vec![sort_expr_asc.reverse()];
2572        let result = config.try_pushdown_sort(&requested_desc)?;
2573        let SortOrderPushdownResult::Inexact { inner } = result else {
2574            panic!("Expected Inexact result");
2575        };
2576        let pushed_config = inner
2577            .as_any()
2578            .downcast_ref::<FileScanConfig>()
2579            .expect("Expected FileScanConfig");
2580        let pushed_files = pushed_config.file_groups[0].files();
2581        assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file2");
2582        assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file1");
2583
2584        Ok(())
2585    }
2586}