Skip to main content

datafusion_datasource/
file_scan_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21use crate::file_groups::FileGroup;
22use crate::{
23    PartitionedFile, display::FileGroupsDisplay, file::FileSource,
24    file_compression_type::FileCompressionType, file_stream::FileStream,
25    source::DataSource, statistics::MinMaxStatistics,
26};
27use arrow::datatypes::FieldRef;
28use arrow::datatypes::{DataType, Schema, SchemaRef};
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::{
31    Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
32};
33use datafusion_execution::{
34    SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
35};
36use datafusion_expr::Operator;
37
38use datafusion_physical_expr::equivalence::project_orderings;
39use datafusion_physical_expr::expressions::{BinaryExpr, Column};
40use datafusion_physical_expr::projection::ProjectionExprs;
41use datafusion_physical_expr::utils::reassign_expr_columns;
42use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
43use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
44use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
45use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
46use datafusion_physical_plan::SortOrderPushdownResult;
47use datafusion_physical_plan::coop::cooperative;
48use datafusion_physical_plan::execution_plan::SchedulingType;
49use datafusion_physical_plan::{
50    DisplayAs, DisplayFormatType,
51    display::{ProjectSchemaDisplay, display_orderings},
52    filter_pushdown::FilterPushdownPropagation,
53    metrics::ExecutionPlanMetricsSet,
54};
55use log::{debug, warn};
56use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
57
58/// The base configurations for a [`DataSourceExec`], the a physical plan for
59/// any given file format.
60///
61/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
62///
63/// # Example
64/// ```
65/// # use std::any::Any;
66/// # use std::sync::Arc;
67/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
68/// # use object_store::ObjectStore;
69/// # use datafusion_common::Result;
70/// # use datafusion_datasource::file::FileSource;
71/// # use datafusion_datasource::file_groups::FileGroup;
72/// # use datafusion_datasource::PartitionedFile;
73/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
74/// # use datafusion_datasource::file_stream::FileOpener;
75/// # use datafusion_datasource::source::DataSourceExec;
76/// # use datafusion_datasource::table_schema::TableSchema;
77/// # use datafusion_execution::object_store::ObjectStoreUrl;
78/// # use datafusion_physical_expr::projection::ProjectionExprs;
79/// # use datafusion_physical_plan::ExecutionPlan;
80/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
81/// # let file_schema = Arc::new(Schema::new(vec![
82/// #  Field::new("c1", DataType::Int32, false),
83/// #  Field::new("c2", DataType::Int32, false),
84/// #  Field::new("c3", DataType::Int32, false),
85/// #  Field::new("c4", DataType::Int32, false),
86/// # ]));
87/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
88/// #[derive(Clone)]
89/// # struct ParquetSource {
90/// #    table_schema: TableSchema,
91/// # };
92/// # impl FileSource for ParquetSource {
93/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Result<Arc<dyn FileOpener>> { unimplemented!() }
94/// #  fn as_any(&self) -> &dyn Any { self  }
95/// #  fn table_schema(&self) -> &TableSchema { &self.table_schema }
96/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
97/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
98/// #  fn file_type(&self) -> &str { "parquet" }
99/// #  // Note that this implementation drops the projection on the floor, it is not complete!
100/// #  fn try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result<Option<Arc<dyn FileSource>>> { Ok(Some(Arc::new(self.clone()) as Arc<dyn FileSource>)) }
101/// #  }
102/// # impl ParquetSource {
103/// #  fn new(table_schema: impl Into<TableSchema>) -> Self { Self {table_schema: table_schema.into()} }
104/// # }
105/// // create FileScan config for reading parquet files from file://
106/// let object_store_url = ObjectStoreUrl::local_filesystem();
107/// let file_source = Arc::new(ParquetSource::new(file_schema.clone()));
108/// let config = FileScanConfigBuilder::new(object_store_url, file_source)
109///   .with_limit(Some(1000))            // read only the first 1000 records
110///   .with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3
111///   .expect("Failed to push down projection")
112///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
113///   .with_file(PartitionedFile::new("file1.parquet", 1234))
114///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
115///   // in a  single row group
116///   .with_file_group(FileGroup::new(vec![
117///    PartitionedFile::new("file2.parquet", 56),
118///    PartitionedFile::new("file3.parquet", 78),
119///   ])).build();
120/// // create an execution plan from the config
121/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
122/// ```
123///
124/// [`DataSourceExec`]: crate::source::DataSourceExec
125/// [`DataSourceExec::from_data_source`]: crate::source::DataSourceExec::from_data_source
126#[derive(Clone)]
127pub struct FileScanConfig {
128    /// Object store URL, used to get an [`ObjectStore`] instance from
129    /// [`RuntimeEnv::object_store`]
130    ///
131    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
132    /// as `file://` or `s3://my_bucket`. It should not include the path to the
133    /// file itself. The relevant URL prefix must be registered via
134    /// [`RuntimeEnv::register_object_store`]
135    ///
136    /// [`ObjectStore`]: object_store::ObjectStore
137    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
138    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
139    pub object_store_url: ObjectStoreUrl,
140    /// List of files to be processed, grouped into partitions
141    ///
142    /// Each file must have a schema of `file_schema` or a subset. If
143    /// a particular file has a subset, the missing columns are
144    /// padded with NULLs.
145    ///
146    /// DataFusion may attempt to read each partition of files
147    /// concurrently, however files *within* a partition will be read
148    /// sequentially, one after the next.
149    pub file_groups: Vec<FileGroup>,
150    /// Table constraints
151    pub constraints: Constraints,
152    /// The maximum number of records to read from this plan. If `None`,
153    /// all records after filtering are returned.
154    pub limit: Option<usize>,
155    /// All equivalent lexicographical orderings that describe the schema.
156    pub output_ordering: Vec<LexOrdering>,
157    /// File compression type
158    pub file_compression_type: FileCompressionType,
159    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
160    pub file_source: Arc<dyn FileSource>,
161    /// Batch size while creating new batches
162    /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
163    pub batch_size: Option<usize>,
164    /// Expression adapter used to adapt filters and projections that are pushed down into the scan
165    /// from the logical schema to the physical schema of the file.
166    pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
167    /// Unprojected statistics for the table (file schema + partition columns).
168    /// These are projected on-demand via `projected_stats()`.
169    ///
170    /// Note that this field is pub(crate) because accessing it directly from outside
171    /// would be incorrect if there are filters being applied, thus this should be accessed
172    /// via [`FileScanConfig::statistics`].
173    pub(crate) statistics: Statistics,
174    /// When true, file_groups are organized by partition column values
175    /// and output_partitioning will return Hash partitioning on partition columns.
176    /// This allows the optimizer to skip hash repartitioning for aggregates and joins
177    /// on partition columns.
178    ///
179    /// If the number of file partitions > target_partitions, the file partitions will be grouped
180    /// in a round-robin fashion such that number of file partitions = target_partitions.
181    pub partitioned_by_file_group: bool,
182}
183
184/// A builder for [`FileScanConfig`]'s.
185///
186/// Example:
187///
188/// ```rust
189/// # use std::sync::Arc;
190/// # use arrow::datatypes::{DataType, Field, Schema};
191/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig};
192/// # use datafusion_datasource::file_compression_type::FileCompressionType;
193/// # use datafusion_datasource::file_groups::FileGroup;
194/// # use datafusion_datasource::PartitionedFile;
195/// # use datafusion_datasource::table_schema::TableSchema;
196/// # use datafusion_execution::object_store::ObjectStoreUrl;
197/// # use datafusion_common::Statistics;
198/// # use datafusion_datasource::file::FileSource;
199///
200/// # fn main() {
201/// # fn with_source(file_source: Arc<dyn FileSource>) {
202///     // Create a schema for our Parquet files
203///     let file_schema = Arc::new(Schema::new(vec![
204///         Field::new("id", DataType::Int32, false),
205///         Field::new("value", DataType::Utf8, false),
206///     ]));
207///
208///     // Create partition columns
209///     let partition_cols = vec![
210///         Arc::new(Field::new("date", DataType::Utf8, false)),
211///     ];
212///
213///     // Create table schema with file schema and partition columns
214///     let table_schema = TableSchema::new(file_schema, partition_cols);
215///
216///     // Create a builder for scanning Parquet files from a local filesystem
217///     let config = FileScanConfigBuilder::new(
218///         ObjectStoreUrl::local_filesystem(),
219///         file_source,
220///     )
221///     // Set a limit of 1000 rows
222///     .with_limit(Some(1000))
223///     // Project only the first column
224///     .with_projection_indices(Some(vec![0]))
225///     .expect("Failed to push down projection")
226///     // Add a file group with two files
227///     .with_file_group(FileGroup::new(vec![
228///         PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024),
229///         PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048),
230///     ]))
231///     // Set compression type
232///     .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
233///     // Build the final config
234///     .build();
235/// # }
236/// # }
237/// ```
238#[derive(Clone)]
239pub struct FileScanConfigBuilder {
240    object_store_url: ObjectStoreUrl,
241    file_source: Arc<dyn FileSource>,
242    limit: Option<usize>,
243    constraints: Option<Constraints>,
244    file_groups: Vec<FileGroup>,
245    statistics: Option<Statistics>,
246    output_ordering: Vec<LexOrdering>,
247    file_compression_type: Option<FileCompressionType>,
248    batch_size: Option<usize>,
249    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
250    partitioned_by_file_group: bool,
251}
252
253impl FileScanConfigBuilder {
254    /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files.
255    ///
256    /// # Parameters:
257    /// * `object_store_url`: See [`FileScanConfig::object_store_url`]
258    /// * `file_source`: See [`FileScanConfig::file_source`]. The file source must have
259    ///   a schema set via its constructor.
260    pub fn new(
261        object_store_url: ObjectStoreUrl,
262        file_source: Arc<dyn FileSource>,
263    ) -> Self {
264        Self {
265            object_store_url,
266            file_source,
267            file_groups: vec![],
268            statistics: None,
269            output_ordering: vec![],
270            file_compression_type: None,
271            limit: None,
272            constraints: None,
273            batch_size: None,
274            expr_adapter_factory: None,
275            partitioned_by_file_group: false,
276        }
277    }
278
279    /// Set the maximum number of records to read from this plan. If `None`,
280    /// all records after filtering are returned.
281    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
282        self.limit = limit;
283        self
284    }
285
286    /// Set the file source for scanning files.
287    ///
288    /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
289    /// after the builder has been created.
290    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
291        self.file_source = file_source;
292        self
293    }
294
295    pub fn table_schema(&self) -> &SchemaRef {
296        self.file_source.table_schema().table_schema()
297    }
298
299    /// Set the columns on which to project the data. Indexes that are higher than the
300    /// number of columns of `file_schema` refer to `table_partition_cols`.
301    ///
302    /// # Deprecated
303    /// Use [`Self::with_projection_indices`] instead. This method will be removed in a future release.
304    #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
305    pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
306        match self.clone().with_projection_indices(indices) {
307            Ok(builder) => builder,
308            Err(e) => {
309                warn!(
310                    "Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
311                );
312                self
313            }
314        }
315    }
316
317    /// Set the columns on which to project the data using column indices.
318    ///
319    /// Indexes that are higher than the number of columns of `file_schema` refer to `table_partition_cols`.
320    pub fn with_projection_indices(
321        mut self,
322        indices: Option<Vec<usize>>,
323    ) -> Result<Self> {
324        let projection_exprs = indices.map(|indices| {
325            ProjectionExprs::from_indices(
326                &indices,
327                self.file_source.table_schema().table_schema(),
328            )
329        });
330        let Some(projection_exprs) = projection_exprs else {
331            return Ok(self);
332        };
333        let new_source = self
334            .file_source
335            .try_pushdown_projection(&projection_exprs)
336            .map_err(|e| {
337                internal_datafusion_err!(
338                    "Failed to push down projection in FileScanConfigBuilder::build: {e}"
339                )
340            })?;
341        if let Some(new_source) = new_source {
342            self.file_source = new_source;
343        } else {
344            internal_err!(
345                "FileSource {} does not support projection pushdown",
346                self.file_source.file_type()
347            )?;
348        }
349        Ok(self)
350    }
351
352    /// Set the table constraints
353    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
354        self.constraints = Some(constraints);
355        self
356    }
357
358    /// Set the estimated overall statistics of the files, taking `filters` into account.
359    /// Defaults to [`Statistics::new_unknown`].
360    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
361        self.statistics = Some(statistics);
362        self
363    }
364
365    /// Set the list of files to be processed, grouped into partitions.
366    ///
367    /// Each file must have a schema of `file_schema` or a subset. If
368    /// a particular file has a subset, the missing columns are
369    /// padded with NULLs.
370    ///
371    /// DataFusion may attempt to read each partition of files
372    /// concurrently, however files *within* a partition will be read
373    /// sequentially, one after the next.
374    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
375        self.file_groups = file_groups;
376        self
377    }
378
379    /// Add a new file group
380    ///
381    /// See [`Self::with_file_groups`] for more information
382    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
383        self.file_groups.push(file_group);
384        self
385    }
386
387    /// Add a file as a single group
388    ///
389    /// See [`Self::with_file_groups`] for more information.
390    pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
391        self.with_file_group(FileGroup::new(vec![partitioned_file]))
392    }
393
394    /// Set the output ordering of the files
395    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
396        self.output_ordering = output_ordering;
397        self
398    }
399
400    /// Set the file compression type
401    pub fn with_file_compression_type(
402        mut self,
403        file_compression_type: FileCompressionType,
404    ) -> Self {
405        self.file_compression_type = Some(file_compression_type);
406        self
407    }
408
409    /// Set the batch_size property
410    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
411        self.batch_size = batch_size;
412        self
413    }
414
415    /// Register an expression adapter used to adapt filters and projections that are pushed down into the scan
416    /// from the logical schema to the physical schema of the file.
417    /// This can include things like:
418    /// - Column ordering changes
419    /// - Handling of missing columns
420    /// - Rewriting expression to use pre-computed values or file format specific optimizations
421    pub fn with_expr_adapter(
422        mut self,
423        expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
424    ) -> Self {
425        self.expr_adapter_factory = expr_adapter;
426        self
427    }
428
429    /// Set whether file groups are organized by partition column values.
430    ///
431    /// When set to true, the output partitioning will be declared as Hash partitioning
432    /// on the partition columns.
433    pub fn with_partitioned_by_file_group(
434        mut self,
435        partitioned_by_file_group: bool,
436    ) -> Self {
437        self.partitioned_by_file_group = partitioned_by_file_group;
438        self
439    }
440
441    /// Build the final [`FileScanConfig`] with all the configured settings.
442    ///
443    /// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
444    /// Any unset optional fields will use their default values.
445    ///
446    /// # Errors
447    /// Returns an error if projection pushdown fails or if schema operations fail.
448    pub fn build(self) -> FileScanConfig {
449        let Self {
450            object_store_url,
451            file_source,
452            limit,
453            constraints,
454            file_groups,
455            statistics,
456            output_ordering,
457            file_compression_type,
458            batch_size,
459            expr_adapter_factory: expr_adapter,
460            partitioned_by_file_group,
461        } = self;
462
463        let constraints = constraints.unwrap_or_default();
464        let statistics = statistics.unwrap_or_else(|| {
465            Statistics::new_unknown(file_source.table_schema().table_schema())
466        });
467        let file_compression_type =
468            file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
469
470        FileScanConfig {
471            object_store_url,
472            file_source,
473            limit,
474            constraints,
475            file_groups,
476            output_ordering,
477            file_compression_type,
478            batch_size,
479            expr_adapter_factory: expr_adapter,
480            statistics,
481            partitioned_by_file_group,
482        }
483    }
484}
485
486impl From<FileScanConfig> for FileScanConfigBuilder {
487    fn from(config: FileScanConfig) -> Self {
488        Self {
489            object_store_url: config.object_store_url,
490            file_source: Arc::<dyn FileSource>::clone(&config.file_source),
491            file_groups: config.file_groups,
492            statistics: Some(config.statistics),
493            output_ordering: config.output_ordering,
494            file_compression_type: Some(config.file_compression_type),
495            limit: config.limit,
496            constraints: Some(config.constraints),
497            batch_size: config.batch_size,
498            expr_adapter_factory: config.expr_adapter_factory,
499            partitioned_by_file_group: config.partitioned_by_file_group,
500        }
501    }
502}
503
504impl DataSource for FileScanConfig {
505    fn open(
506        &self,
507        partition: usize,
508        context: Arc<TaskContext>,
509    ) -> Result<SendableRecordBatchStream> {
510        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
511        let batch_size = self
512            .batch_size
513            .unwrap_or_else(|| context.session_config().batch_size());
514
515        let source = self.file_source.with_batch_size(batch_size);
516
517        let opener = source.create_file_opener(object_store, self, partition)?;
518
519        let stream = FileStream::new(self, partition, opener, source.metrics())?;
520        Ok(Box::pin(cooperative(stream)))
521    }
522
523    fn as_any(&self) -> &dyn Any {
524        self
525    }
526
527    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
528        match t {
529            DisplayFormatType::Default | DisplayFormatType::Verbose => {
530                let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
531                let orderings = get_projected_output_ordering(self, &schema);
532
533                write!(f, "file_groups=")?;
534                FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
535
536                if !schema.fields().is_empty() {
537                    if let Some(projection) = self.file_source.projection() {
538                        // This matches what ProjectionExec does.
539                        // TODO: can we put this into ProjectionExprs so that it's shared code?
540                        let expr: Vec<String> = projection
541                            .as_ref()
542                            .iter()
543                            .map(|proj_expr| {
544                                if let Some(column) =
545                                    proj_expr.expr.as_any().downcast_ref::<Column>()
546                                {
547                                    if column.name() == proj_expr.alias {
548                                        column.name().to_string()
549                                    } else {
550                                        format!(
551                                            "{} as {}",
552                                            proj_expr.expr, proj_expr.alias
553                                        )
554                                    }
555                                } else {
556                                    format!("{} as {}", proj_expr.expr, proj_expr.alias)
557                                }
558                            })
559                            .collect();
560                        write!(f, ", projection=[{}]", expr.join(", "))?;
561                    } else {
562                        write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
563                    }
564                }
565
566                if let Some(limit) = self.limit {
567                    write!(f, ", limit={limit}")?;
568                }
569
570                display_orderings(f, &orderings)?;
571
572                if !self.constraints.is_empty() {
573                    write!(f, ", {}", self.constraints)?;
574                }
575
576                self.fmt_file_source(t, f)
577            }
578            DisplayFormatType::TreeRender => {
579                writeln!(f, "format={}", self.file_source.file_type())?;
580                self.file_source.fmt_extra(t, f)?;
581                let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
582                writeln!(f, "files={num_files}")?;
583                Ok(())
584            }
585        }
586    }
587
588    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
589    fn repartitioned(
590        &self,
591        target_partitions: usize,
592        repartition_file_min_size: usize,
593        output_ordering: Option<LexOrdering>,
594    ) -> Result<Option<Arc<dyn DataSource>>> {
595        // When files are grouped by partition values, we cannot allow byte-range
596        // splitting. It would mix rows from different partition values across
597        // file groups, breaking the Hash partitioning.
598        if self.partitioned_by_file_group {
599            return Ok(None);
600        }
601
602        let source = self.file_source.repartitioned(
603            target_partitions,
604            repartition_file_min_size,
605            output_ordering,
606            self,
607        )?;
608
609        Ok(source.map(|s| Arc::new(s) as _))
610    }
611
612    /// Returns the output partitioning for this file scan.
613    ///
614    /// When `partitioned_by_file_group` is true, this returns `Partitioning::Hash` on
615    /// the Hive partition columns, allowing the optimizer to skip hash repartitioning
616    /// for aggregates and joins on those columns.
617    ///
618    /// Tradeoffs
619    /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with
620    ///   `GROUP BY` or `ORDER BY` on partition columns.
621    /// - Cost: Files are grouped by partition values rather than split by byte
622    ///   ranges, which may reduce I/O parallelism when partition sizes are uneven.
623    ///   For simple aggregations without `ORDER BY`, this cost may outweigh the benefit.
624    ///
625    /// Follow-up Work
626    /// - Idea: Could allow byte-range splitting within partition-aware groups,
627    ///   preserving I/O parallelism while maintaining partition semantics.
628    fn output_partitioning(&self) -> Partitioning {
629        if self.partitioned_by_file_group {
630            let partition_cols = self.table_partition_cols();
631            if !partition_cols.is_empty() {
632                let projected_schema = match self.projected_schema() {
633                    Ok(schema) => schema,
634                    Err(_) => {
635                        debug!(
636                            "Could not get projected schema, falling back to UnknownPartitioning."
637                        );
638                        return Partitioning::UnknownPartitioning(self.file_groups.len());
639                    }
640                };
641
642                // Build Column expressions for partition columns based on their
643                // position in the projected schema
644                let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
645                for partition_col in partition_cols {
646                    if let Some((idx, _)) = projected_schema
647                        .fields()
648                        .iter()
649                        .enumerate()
650                        .find(|(_, f)| f.name() == partition_col.name())
651                    {
652                        exprs.push(Arc::new(Column::new(partition_col.name(), idx)));
653                    }
654                }
655
656                if exprs.len() == partition_cols.len() {
657                    return Partitioning::Hash(exprs, self.file_groups.len());
658                }
659            }
660        }
661        Partitioning::UnknownPartitioning(self.file_groups.len())
662    }
663
664    fn eq_properties(&self) -> EquivalenceProperties {
665        let schema = self.file_source.table_schema().table_schema();
666        let mut eq_properties = EquivalenceProperties::new_with_orderings(
667            Arc::clone(schema),
668            self.validated_output_ordering(),
669        )
670        .with_constraints(self.constraints.clone());
671
672        if let Some(filter) = self.file_source.filter() {
673            // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
674            // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
675            match Self::add_filter_equivalence_info(&filter, &mut eq_properties, schema) {
676                Ok(()) => {}
677                Err(e) => {
678                    warn!("Failed to add filter equivalence info: {e}");
679                    #[cfg(debug_assertions)]
680                    panic!("Failed to add filter equivalence info: {e}");
681                }
682            }
683        }
684
685        if let Some(projection) = self.file_source.projection() {
686            match (
687                projection.project_schema(schema),
688                projection.projection_mapping(schema),
689            ) {
690                (Ok(output_schema), Ok(mapping)) => {
691                    eq_properties =
692                        eq_properties.project(&mapping, Arc::new(output_schema));
693                }
694                (Err(e), _) | (_, Err(e)) => {
695                    warn!("Failed to project equivalence properties: {e}");
696                    #[cfg(debug_assertions)]
697                    panic!("Failed to project equivalence properties: {e}");
698                }
699            }
700        }
701
702        eq_properties
703    }
704
705    fn scheduling_type(&self) -> SchedulingType {
706        SchedulingType::Cooperative
707    }
708
709    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
710        if let Some(partition) = partition {
711            // Get statistics for a specific partition
712            // Note: FileGroup statistics include partition columns (computed from partition_values)
713            if let Some(file_group) = self.file_groups.get(partition)
714                && let Some(stat) = file_group.file_statistics(None)
715            {
716                // Project the statistics based on the projection
717                let output_schema = self.projected_schema()?;
718                return if let Some(projection) = self.file_source.projection() {
719                    projection.project_statistics(stat.clone(), &output_schema)
720                } else {
721                    Ok(stat.clone())
722                };
723            }
724            // If no statistics available for this partition, return unknown
725            Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
726        } else {
727            // Return aggregate statistics across all partitions
728            let statistics = self.statistics();
729            let projection = self.file_source.projection();
730            let output_schema = self.projected_schema()?;
731            if let Some(projection) = &projection {
732                projection.project_statistics(statistics.clone(), &output_schema)
733            } else {
734                Ok(statistics)
735            }
736        }
737    }
738
739    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
740        let source = FileScanConfigBuilder::from(self.clone())
741            .with_limit(limit)
742            .build();
743        Some(Arc::new(source))
744    }
745
746    fn fetch(&self) -> Option<usize> {
747        self.limit
748    }
749
750    fn metrics(&self) -> ExecutionPlanMetricsSet {
751        self.file_source.metrics().clone()
752    }
753
754    fn try_swapping_with_projection(
755        &self,
756        projection: &ProjectionExprs,
757    ) -> Result<Option<Arc<dyn DataSource>>> {
758        match self.file_source.try_pushdown_projection(projection)? {
759            Some(new_source) => {
760                let mut new_file_scan_config = self.clone();
761                new_file_scan_config.file_source = new_source;
762                Ok(Some(Arc::new(new_file_scan_config) as Arc<dyn DataSource>))
763            }
764            None => Ok(None),
765        }
766    }
767
768    fn try_pushdown_filters(
769        &self,
770        filters: Vec<Arc<dyn PhysicalExpr>>,
771        config: &ConfigOptions,
772    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
773        // Remap filter Column indices to match the table schema (file + partition columns).
774        // This is necessary because filters may have been created against a different schema
775        // (e.g., after projection pushdown) and need to be remapped to the table schema
776        // before being passed to the file source and ultimately serialized.
777        // For example, the filter being pushed down is `c1_c2 > 5` and it was created
778        // against the output schema of the this `DataSource` which has projection `c1 + c2 as c1_c2`.
779        // Thus we need to rewrite the filter back to `c1 + c2 > 5` before passing it to the file source.
780        let table_schema = self.file_source.table_schema().table_schema();
781        // If there's a projection with aliases, first map the filters back through
782        // the projection expressions before remapping to the table schema.
783        let filters_to_remap = if let Some(projection) = self.file_source.projection() {
784            use datafusion_physical_plan::projection::update_expr;
785            filters
786                .into_iter()
787                .map(|filter| {
788                    update_expr(&filter, projection.as_ref(), true)?.ok_or_else(|| {
789                        internal_datafusion_err!(
790                            "Failed to map filter expression through projection: {}",
791                            filter
792                        )
793                    })
794                })
795                .collect::<Result<Vec<_>>>()?
796        } else {
797            filters
798        };
799        // Now remap column indices to match the table schema.
800        let remapped_filters: Result<Vec<_>> = filters_to_remap
801            .into_iter()
802            .map(|filter| reassign_expr_columns(filter, table_schema.as_ref()))
803            .collect();
804        let remapped_filters = remapped_filters?;
805
806        let result = self
807            .file_source
808            .try_pushdown_filters(remapped_filters, config)?;
809        match result.updated_node {
810            Some(new_file_source) => {
811                let mut new_file_scan_config = self.clone();
812                new_file_scan_config.file_source = new_file_source;
813                Ok(FilterPushdownPropagation {
814                    filters: result.filters,
815                    updated_node: Some(Arc::new(new_file_scan_config) as _),
816                })
817            }
818            None => {
819                // If the file source does not support filter pushdown, return the original config
820                Ok(FilterPushdownPropagation {
821                    filters: result.filters,
822                    updated_node: None,
823                })
824            }
825        }
826    }
827
828    fn try_pushdown_sort(
829        &self,
830        order: &[PhysicalSortExpr],
831    ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
832        // Delegate to FileSource to check if reverse scanning can satisfy the request.
833        let pushdown_result = self
834            .file_source
835            .try_reverse_output(order, &self.eq_properties())?;
836
837        match pushdown_result {
838            SortOrderPushdownResult::Exact { inner } => {
839                Ok(SortOrderPushdownResult::Exact {
840                    inner: self.rebuild_with_source(inner, true)?,
841                })
842            }
843            SortOrderPushdownResult::Inexact { inner } => {
844                Ok(SortOrderPushdownResult::Inexact {
845                    inner: self.rebuild_with_source(inner, false)?,
846                })
847            }
848            SortOrderPushdownResult::Unsupported => {
849                Ok(SortOrderPushdownResult::Unsupported)
850            }
851        }
852    }
853}
854
855impl FileScanConfig {
856    /// Returns only the output orderings that are validated against actual
857    /// file group statistics.
858    ///
859    /// For example, individual files may be ordered by `col1 ASC`,
860    /// but if we have files with these min/max statistics in a single partition / file group:
861    ///
862    /// - file1: min(col1) = 10, max(col1) = 20
863    /// - file2: min(col1) = 5, max(col1) = 15
864    ///
865    /// Because reading file1 followed by file2 would produce out-of-order output (there is overlap
866    /// in the ranges), we cannot retain `col1 ASC` as a valid output ordering.
867    ///
868    /// Similarly this would not be a valid order (non-overlapping ranges but not ordered):
869    ///
870    /// - file1: min(col1) = 20, max(col1) = 30
871    /// - file2: min(col1) = 10, max(col1) = 15
872    ///
873    /// On the other hand if we had:
874    ///
875    /// - file1: min(col1) = 5, max(col1) = 15
876    /// - file2: min(col1) = 16, max(col1) = 25
877    ///
878    /// Then we know that reading file1 followed by file2 will produce ordered output,
879    /// so `col1 ASC` would be retained.
880    ///
881    /// Note that we are checking for ordering *within* *each* file group / partition,
882    /// files in different partitions are read independently and do not affect each other's ordering.
883    /// Merging of the multiple partition streams into a single ordered stream is handled
884    /// upstream e.g. by `SortPreservingMergeExec`.
885    fn validated_output_ordering(&self) -> Vec<LexOrdering> {
886        let schema = self.file_source.table_schema().table_schema();
887        validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
888    }
889
890    /// Get the file schema (schema of the files without partition columns)
891    pub fn file_schema(&self) -> &SchemaRef {
892        self.file_source.table_schema().file_schema()
893    }
894
895    /// Get the table partition columns
896    pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
897        self.file_source.table_schema().table_partition_cols()
898    }
899
900    /// Returns the unprojected table statistics, marking them as inexact if filters are present.
901    ///
902    /// When filters are pushed down (including pruning predicates and bloom filters),
903    /// we can't guarantee the statistics are exact because we don't know how many
904    /// rows will be filtered out.
905    pub fn statistics(&self) -> Statistics {
906        if self.file_source.filter().is_some() {
907            self.statistics.clone().to_inexact()
908        } else {
909            self.statistics.clone()
910        }
911    }
912
913    pub fn projected_schema(&self) -> Result<Arc<Schema>> {
914        let schema = self.file_source.table_schema().table_schema();
915        match self.file_source.projection() {
916            Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)),
917            None => Ok(Arc::clone(schema)),
918        }
919    }
920
921    fn add_filter_equivalence_info(
922        filter: &Arc<dyn PhysicalExpr>,
923        eq_properties: &mut EquivalenceProperties,
924        schema: &Schema,
925    ) -> Result<()> {
926        // Gather valid equality pairs from the filter expression
927        let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
928            // Ignore any binary expressions that reference non-existent columns in the current schema
929            // (e.g. due to unnecessary projections being removed)
930            reassign_expr_columns(Arc::clone(expr), schema)
931                .ok()
932                .and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
933                    Some(expr) if expr.op() == &Operator::Eq => {
934                        Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
935                    }
936                    _ => None,
937                })
938        });
939
940        for (lhs, rhs) in equal_pairs {
941            eq_properties.add_equal_conditions(lhs, rhs)?
942        }
943
944        Ok(())
945    }
946
947    /// Returns whether newlines in values are supported.
948    ///
949    /// This method always returns `false`. The actual newlines_in_values setting
950    /// has been moved to [`CsvSource`] and should be accessed via
951    /// [`CsvSource::csv_options()`] instead.
952    ///
953    /// [`CsvSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html
954    /// [`CsvSource::csv_options()`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html#method.csv_options
955    #[deprecated(
956        since = "52.0.0",
957        note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
958    )]
959    pub fn newlines_in_values(&self) -> bool {
960        false
961    }
962
963    #[deprecated(
964        since = "52.0.0",
965        note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
966    )]
967    pub fn projected_constraints(&self) -> Constraints {
968        let props = self.eq_properties();
969        props.constraints().clone()
970    }
971
972    #[deprecated(
973        since = "52.0.0",
974        note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
975    )]
976    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
977        #[expect(deprecated)]
978        self.file_source.projection().as_ref().map(|p| {
979            p.ordered_column_indices()
980                .into_iter()
981                .filter(|&i| i < self.file_schema().fields().len())
982                .collect::<Vec<_>>()
983        })
984    }
985
986    /// Splits file groups into new groups based on statistics to enable efficient parallel processing.
987    ///
988    /// The method distributes files across a target number of partitions while ensuring
989    /// files within each partition maintain sort order based on their min/max statistics.
990    ///
991    /// The algorithm works by:
992    /// 1. Takes files sorted by minimum values
993    /// 2. For each file:
994    ///   - Finds eligible groups (empty or where file's min > group's last max)
995    ///   - Selects the smallest eligible group
996    ///   - Creates a new group if needed
997    ///
998    /// # Parameters
999    /// * `table_schema`: Schema containing information about the columns
1000    /// * `file_groups`: The original file groups to split
1001    /// * `sort_order`: The lexicographical ordering to maintain within each group
1002    /// * `target_partitions`: The desired number of output partitions
1003    ///
1004    /// # Returns
1005    /// A new set of file groups, where files within each group are non-overlapping with respect to
1006    /// their min/max statistics and maintain the specified sort order.
1007    pub fn split_groups_by_statistics_with_target_partitions(
1008        table_schema: &SchemaRef,
1009        file_groups: &[FileGroup],
1010        sort_order: &LexOrdering,
1011        target_partitions: usize,
1012    ) -> Result<Vec<FileGroup>> {
1013        if target_partitions == 0 {
1014            return Err(internal_datafusion_err!(
1015                "target_partitions must be greater than 0"
1016            ));
1017        }
1018
1019        let flattened_files = file_groups
1020            .iter()
1021            .flat_map(FileGroup::iter)
1022            .collect::<Vec<_>>();
1023
1024        if flattened_files.is_empty() {
1025            return Ok(vec![]);
1026        }
1027
1028        let statistics = MinMaxStatistics::new_from_files(
1029            sort_order,
1030            table_schema,
1031            None,
1032            flattened_files.iter().copied(),
1033        )?;
1034
1035        let indices_sorted_by_min = statistics.min_values_sorted();
1036
1037        // Initialize with target_partitions empty groups
1038        let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
1039
1040        for (idx, min) in indices_sorted_by_min {
1041            if let Some((_, group)) = file_groups_indices
1042                .iter_mut()
1043                .enumerate()
1044                .filter(|(_, group)| {
1045                    group.is_empty()
1046                        || min
1047                            > statistics
1048                                .max(*group.last().expect("groups should not be empty"))
1049                })
1050                .min_by_key(|(_, group)| group.len())
1051            {
1052                group.push(idx);
1053            } else {
1054                // Create a new group if no existing group fits
1055                file_groups_indices.push(vec![idx]);
1056            }
1057        }
1058
1059        // Remove any empty groups
1060        file_groups_indices.retain(|group| !group.is_empty());
1061
1062        // Assemble indices back into groups of PartitionedFiles
1063        Ok(file_groups_indices
1064            .into_iter()
1065            .map(|file_group_indices| {
1066                FileGroup::new(
1067                    file_group_indices
1068                        .into_iter()
1069                        .map(|idx| flattened_files[idx].clone())
1070                        .collect(),
1071                )
1072            })
1073            .collect())
1074    }
1075
1076    /// Attempts to do a bin-packing on files into file groups, such that any two files
1077    /// in a file group are ordered and non-overlapping with respect to their statistics.
1078    /// It will produce the smallest number of file groups possible.
1079    pub fn split_groups_by_statistics(
1080        table_schema: &SchemaRef,
1081        file_groups: &[FileGroup],
1082        sort_order: &LexOrdering,
1083    ) -> Result<Vec<FileGroup>> {
1084        let flattened_files = file_groups
1085            .iter()
1086            .flat_map(FileGroup::iter)
1087            .collect::<Vec<_>>();
1088        // First Fit:
1089        // * Choose the first file group that a file can be placed into.
1090        // * If it fits into no existing file groups, create a new one.
1091        //
1092        // By sorting files by min values and then applying first-fit bin packing,
1093        // we can produce the smallest number of file groups such that
1094        // files within a group are in order and non-overlapping.
1095        //
1096        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
1097        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
1098
1099        if flattened_files.is_empty() {
1100            return Ok(vec![]);
1101        }
1102
1103        let statistics = MinMaxStatistics::new_from_files(
1104            sort_order,
1105            table_schema,
1106            None,
1107            flattened_files.iter().copied(),
1108        )
1109        .map_err(|e| {
1110            e.context("construct min/max statistics for split_groups_by_statistics")
1111        })?;
1112
1113        let indices_sorted_by_min = statistics.min_values_sorted();
1114        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1115
1116        for (idx, min) in indices_sorted_by_min {
1117            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1118                // If our file is non-overlapping and comes _after_ the last file,
1119                // it fits in this file group.
1120                min > statistics.max(
1121                    *group
1122                        .last()
1123                        .expect("groups should be nonempty at construction"),
1124                )
1125            });
1126            match file_group_to_insert {
1127                Some(group) => group.push(idx),
1128                None => file_groups_indices.push(vec![idx]),
1129            }
1130        }
1131
1132        // Assemble indices back into groups of PartitionedFiles
1133        Ok(file_groups_indices
1134            .into_iter()
1135            .map(|file_group_indices| {
1136                file_group_indices
1137                    .into_iter()
1138                    .map(|idx| flattened_files[idx].clone())
1139                    .collect()
1140            })
1141            .collect())
1142    }
1143
1144    /// Write the data_type based on file_source
1145    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1146        write!(f, ", file_type={}", self.file_source.file_type())?;
1147        self.file_source.fmt_extra(t, f)
1148    }
1149
1150    /// Returns the file_source
1151    pub fn file_source(&self) -> &Arc<dyn FileSource> {
1152        &self.file_source
1153    }
1154
1155    /// Helper: Rebuild FileScanConfig with new file source
1156    fn rebuild_with_source(
1157        &self,
1158        new_file_source: Arc<dyn FileSource>,
1159        is_exact: bool,
1160    ) -> Result<Arc<dyn DataSource>> {
1161        let mut new_config = self.clone();
1162
1163        // Reverse file groups (FileScanConfig's responsibility)
1164        new_config.file_groups = new_config
1165            .file_groups
1166            .into_iter()
1167            .map(|group| {
1168                let mut files = group.into_inner();
1169                files.reverse();
1170                files.into()
1171            })
1172            .collect();
1173
1174        new_config.file_source = new_file_source;
1175
1176        // Phase 1: Clear output_ordering for Inexact
1177        // (we're only reversing row groups, not guaranteeing perfect ordering)
1178        if !is_exact {
1179            new_config.output_ordering = vec![];
1180        }
1181
1182        Ok(Arc::new(new_config))
1183    }
1184}
1185
1186impl Debug for FileScanConfig {
1187    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1188        write!(f, "FileScanConfig {{")?;
1189        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1190
1191        write!(f, "statistics={:?}, ", self.statistics())?;
1192
1193        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1194        write!(f, "}}")
1195    }
1196}
1197
1198impl DisplayAs for FileScanConfig {
1199    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1200        let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
1201        let orderings = get_projected_output_ordering(self, &schema);
1202
1203        write!(f, "file_groups=")?;
1204        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1205
1206        if !schema.fields().is_empty() {
1207            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1208        }
1209
1210        if let Some(limit) = self.limit {
1211            write!(f, ", limit={limit}")?;
1212        }
1213
1214        display_orderings(f, &orderings)?;
1215
1216        if !self.constraints.is_empty() {
1217            write!(f, ", {}", self.constraints)?;
1218        }
1219
1220        Ok(())
1221    }
1222}
1223
1224/// Get the indices of columns in a projection if the projection is a simple
1225/// list of columns.
1226/// If there are any expressions other than columns, returns None.
1227fn ordered_column_indices_from_projection(
1228    projection: &ProjectionExprs,
1229) -> Option<Vec<usize>> {
1230    projection
1231        .expr_iter()
1232        .map(|e| {
1233            let index = e.as_any().downcast_ref::<Column>()?.index();
1234            Some(index)
1235        })
1236        .collect::<Option<Vec<usize>>>()
1237}
1238
1239/// Check whether a given ordering is valid for all file groups by verifying
1240/// that files within each group are sorted according to their min/max statistics.
1241///
1242/// For single-file (or empty) groups, the ordering is trivially valid.
1243/// For multi-file groups, we check that the min/max statistics for the sort
1244/// columns are in order and non-overlapping (or touching at boundaries).
1245///
1246/// `projection` maps projected column indices back to table-schema indices
1247/// when validating after projection; pass `None` when validating at
1248/// table-schema level.
1249fn is_ordering_valid_for_file_groups(
1250    file_groups: &[FileGroup],
1251    ordering: &LexOrdering,
1252    schema: &SchemaRef,
1253    projection: Option<&[usize]>,
1254) -> bool {
1255    file_groups.iter().all(|group| {
1256        if group.len() <= 1 {
1257            return true; // single-file groups are trivially sorted
1258        }
1259        match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
1260        {
1261            Ok(stats) => stats.is_sorted(),
1262            Err(_) => false, // can't prove sorted → reject
1263        }
1264    })
1265}
1266
1267/// Filters orderings to retain only those valid for all file groups,
1268/// verified via min/max statistics.
1269fn validate_orderings(
1270    orderings: &[LexOrdering],
1271    schema: &SchemaRef,
1272    file_groups: &[FileGroup],
1273    projection: Option<&[usize]>,
1274) -> Vec<LexOrdering> {
1275    orderings
1276        .iter()
1277        .filter(|ordering| {
1278            is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
1279        })
1280        .cloned()
1281        .collect()
1282}
1283
1284/// The various listing tables does not attempt to read all files
1285/// concurrently, instead they will read files in sequence within a
1286/// partition.  This is an important property as it allows plans to
1287/// run against 1000s of files and not try to open them all
1288/// concurrently.
1289///
1290/// However, it means if we assign more than one file to a partition
1291/// the output sort order will not be preserved as illustrated in the
1292/// following diagrams:
1293///
1294/// When only 1 file is assigned to each partition, each partition is
1295/// correctly sorted on `(A, B, C)`
1296///
1297/// ```text
1298/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
1299///   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
1300/// ┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
1301///   │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
1302/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
1303///   │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
1304/// ┃                                          │                    │                     ┃
1305///   │                   │ │                    │                    │                 │
1306/// ┃                                          │                    │                     ┃
1307///   │                   │ │                    │                    │                 │
1308/// ┃                                          │                    │                     ┃
1309///   │                   │ │                    │                    │                 │
1310/// ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1311///      DataFusion           DataFusion           DataFusion           DataFusion
1312/// ┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
1313///  ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1314///
1315///                                      DataSourceExec
1316/// ```
1317///
1318/// However, when more than 1 file is assigned to each partition, each
1319/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
1320/// file is scanned, the same values for A, B and C can be repeated in
1321/// the same sorted stream
1322///
1323///```text
1324/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1325///   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1326/// ┃   ┌───────────────┐     ┌──────────────┐ │
1327///   │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
1328/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1329///   │ └───────────────┘ │ │ └──────────────┘   ┃
1330/// ┃   ┌───────────────┐     ┌──────────────┐ │
1331///   │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
1332/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1333///   │ └───────────────┘ │ │ └──────────────┘   ┃
1334/// ┃                                          │
1335///   │                   │ │                    ┃
1336/// ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
1337///      DataFusion           DataFusion         ┃
1338/// ┃    Partition 1          Partition 2
1339///  ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
1340///
1341///              DataSourceExec
1342/// ```
1343fn get_projected_output_ordering(
1344    base_config: &FileScanConfig,
1345    projected_schema: &SchemaRef,
1346) -> Vec<LexOrdering> {
1347    let projected_orderings =
1348        project_orderings(&base_config.output_ordering, projected_schema);
1349
1350    let indices = base_config
1351        .file_source
1352        .projection()
1353        .as_ref()
1354        .map(|p| ordered_column_indices_from_projection(p));
1355
1356    match indices {
1357        Some(Some(indices)) => {
1358            // Simple column projection — validate with statistics
1359            validate_orderings(
1360                &projected_orderings,
1361                projected_schema,
1362                &base_config.file_groups,
1363                Some(indices.as_slice()),
1364            )
1365        }
1366        None => {
1367            // No projection — validate with statistics (no remapping needed)
1368            validate_orderings(
1369                &projected_orderings,
1370                projected_schema,
1371                &base_config.file_groups,
1372                None,
1373            )
1374        }
1375        Some(None) => {
1376            // Complex projection (expressions, not simple columns) — can't
1377            // determine column indices for statistics. Still valid if all
1378            // file groups have at most one file.
1379            if base_config.file_groups.iter().all(|g| g.len() <= 1) {
1380                projected_orderings
1381            } else {
1382                debug!(
1383                    "Skipping specified output orderings. \
1384                     Some file groups couldn't be determined to be sorted: {:?}",
1385                    base_config.file_groups
1386                );
1387                vec![]
1388            }
1389        }
1390    }
1391}
1392
1393/// Convert type to a type suitable for use as a `ListingTable`
1394/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1395/// a reasonable trade off between a reasonable number of partition
1396/// values and space efficiency.
1397///
1398/// This use this to specify types for partition columns. However
1399/// you MAY also choose not to dictionary-encode the data or to use a
1400/// different dictionary type.
1401///
1402/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1403pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1404    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1405}
1406
1407/// Convert a [`ScalarValue`] of partition columns to a type, as
1408/// described in the documentation of [`wrap_partition_type_in_dict`],
1409/// which can wrap the types.
1410pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1411    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416    use std::collections::HashMap;
1417
1418    use super::*;
1419    use crate::TableSchema;
1420    use crate::test_util::col;
1421    use crate::{
1422        generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1423        verify_sort_integrity,
1424    };
1425
1426    use arrow::datatypes::Field;
1427    use datafusion_common::stats::Precision;
1428    use datafusion_common::{ColumnStatistics, internal_err};
1429    use datafusion_expr::{Operator, SortExpr};
1430    use datafusion_physical_expr::create_physical_sort_expr;
1431    use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
1432    use datafusion_physical_expr::projection::ProjectionExpr;
1433    use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1434
1435    #[test]
1436    fn physical_plan_config_no_projection_tab_cols_as_field() {
1437        let file_schema = aggr_test_schema();
1438
1439        // make a table_partition_col as a field
1440        let table_partition_col =
1441            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1442                .with_metadata(HashMap::from_iter(vec![(
1443                    "key_whatever".to_owned(),
1444                    "value_whatever".to_owned(),
1445                )]));
1446
1447        let conf = config_for_projection(
1448            Arc::clone(&file_schema),
1449            None,
1450            Statistics::new_unknown(&file_schema),
1451            vec![table_partition_col.clone()],
1452        );
1453
1454        // verify the proj_schema includes the last column and exactly the same the field it is defined
1455        let proj_schema = conf.projected_schema().unwrap();
1456        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1457        assert_eq!(
1458            *proj_schema.field(file_schema.fields().len()),
1459            table_partition_col,
1460            "partition columns are the last columns and ust have all values defined in created field"
1461        );
1462    }
1463
1464    #[test]
1465    fn test_split_groups_by_statistics() -> Result<()> {
1466        use chrono::TimeZone;
1467        use datafusion_common::DFSchema;
1468        use datafusion_expr::execution_props::ExecutionProps;
1469        use object_store::{ObjectMeta, path::Path};
1470
1471        struct File {
1472            name: &'static str,
1473            date: &'static str,
1474            statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1475        }
1476        impl File {
1477            fn new(
1478                name: &'static str,
1479                date: &'static str,
1480                statistics: Vec<Option<(f64, f64)>>,
1481            ) -> Self {
1482                Self::new_nullable(
1483                    name,
1484                    date,
1485                    statistics
1486                        .into_iter()
1487                        .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1488                        .collect(),
1489                )
1490            }
1491
1492            fn new_nullable(
1493                name: &'static str,
1494                date: &'static str,
1495                statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1496            ) -> Self {
1497                Self {
1498                    name,
1499                    date,
1500                    statistics,
1501                }
1502            }
1503        }
1504
1505        struct TestCase {
1506            name: &'static str,
1507            file_schema: Schema,
1508            files: Vec<File>,
1509            sort: Vec<SortExpr>,
1510            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1511        }
1512
1513        use datafusion_expr::col;
1514        let cases = vec![
1515            TestCase {
1516                name: "test sort",
1517                file_schema: Schema::new(vec![Field::new(
1518                    "value".to_string(),
1519                    DataType::Float64,
1520                    false,
1521                )]),
1522                files: vec![
1523                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1524                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1525                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1526                ],
1527                sort: vec![col("value").sort(true, false)],
1528                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1529            },
1530            // same input but file '2' is in the middle
1531            // test that we still order correctly
1532            TestCase {
1533                name: "test sort with files ordered differently",
1534                file_schema: Schema::new(vec![Field::new(
1535                    "value".to_string(),
1536                    DataType::Float64,
1537                    false,
1538                )]),
1539                files: vec![
1540                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1541                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1542                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1543                ],
1544                sort: vec![col("value").sort(true, false)],
1545                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1546            },
1547            TestCase {
1548                name: "reverse sort",
1549                file_schema: Schema::new(vec![Field::new(
1550                    "value".to_string(),
1551                    DataType::Float64,
1552                    false,
1553                )]),
1554                files: vec![
1555                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1556                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1557                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1558                ],
1559                sort: vec![col("value").sort(false, true)],
1560                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1561            },
1562            TestCase {
1563                name: "nullable sort columns, nulls last",
1564                file_schema: Schema::new(vec![Field::new(
1565                    "value".to_string(),
1566                    DataType::Float64,
1567                    true,
1568                )]),
1569                files: vec![
1570                    File::new_nullable(
1571                        "0",
1572                        "2023-01-01",
1573                        vec![Some((Some(0.00), Some(0.49)))],
1574                    ),
1575                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1576                    File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1577                ],
1578                sort: vec![col("value").sort(true, false)],
1579                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1580            },
1581            TestCase {
1582                name: "nullable sort columns, nulls first",
1583                file_schema: Schema::new(vec![Field::new(
1584                    "value".to_string(),
1585                    DataType::Float64,
1586                    true,
1587                )]),
1588                files: vec![
1589                    File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1590                    File::new_nullable(
1591                        "1",
1592                        "2023-01-01",
1593                        vec![Some((Some(0.50), Some(1.00)))],
1594                    ),
1595                    File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1596                ],
1597                sort: vec![col("value").sort(true, true)],
1598                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1599            },
1600            TestCase {
1601                name: "all three non-overlapping",
1602                file_schema: Schema::new(vec![Field::new(
1603                    "value".to_string(),
1604                    DataType::Float64,
1605                    false,
1606                )]),
1607                files: vec![
1608                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1609                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1610                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1611                ],
1612                sort: vec![col("value").sort(true, false)],
1613                expected_result: Ok(vec![vec!["0", "1", "2"]]),
1614            },
1615            TestCase {
1616                name: "all three overlapping",
1617                file_schema: Schema::new(vec![Field::new(
1618                    "value".to_string(),
1619                    DataType::Float64,
1620                    false,
1621                )]),
1622                files: vec![
1623                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1624                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1625                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1626                ],
1627                sort: vec![col("value").sort(true, false)],
1628                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1629            },
1630            TestCase {
1631                name: "empty input",
1632                file_schema: Schema::new(vec![Field::new(
1633                    "value".to_string(),
1634                    DataType::Float64,
1635                    false,
1636                )]),
1637                files: vec![],
1638                sort: vec![col("value").sort(true, false)],
1639                expected_result: Ok(vec![]),
1640            },
1641            TestCase {
1642                name: "one file missing statistics",
1643                file_schema: Schema::new(vec![Field::new(
1644                    "value".to_string(),
1645                    DataType::Float64,
1646                    false,
1647                )]),
1648                files: vec![
1649                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1650                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1651                    File::new("2", "2023-01-02", vec![None]),
1652                ],
1653                sort: vec![col("value").sort(true, false)],
1654                expected_result: Err(
1655                    "construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
1656                ),
1657            },
1658        ];
1659
1660        for case in cases {
1661            let table_schema = Arc::new(Schema::new(
1662                case.file_schema
1663                    .fields()
1664                    .clone()
1665                    .into_iter()
1666                    .cloned()
1667                    .chain(Some(Arc::new(Field::new(
1668                        "date".to_string(),
1669                        DataType::Utf8,
1670                        false,
1671                    ))))
1672                    .collect::<Vec<_>>(),
1673            ));
1674            let Some(sort_order) = LexOrdering::new(
1675                case.sort
1676                    .into_iter()
1677                    .map(|expr| {
1678                        create_physical_sort_expr(
1679                            &expr,
1680                            &DFSchema::try_from(Arc::clone(&table_schema))?,
1681                            &ExecutionProps::default(),
1682                        )
1683                    })
1684                    .collect::<Result<Vec<_>>>()?,
1685            ) else {
1686                return internal_err!("This test should always use an ordering");
1687            };
1688
1689            let partitioned_files = FileGroup::new(
1690                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
1691            );
1692            let result = FileScanConfig::split_groups_by_statistics(
1693                &table_schema,
1694                std::slice::from_ref(&partitioned_files),
1695                &sort_order,
1696            );
1697            let results_by_name = result
1698                .as_ref()
1699                .map(|file_groups| {
1700                    file_groups
1701                        .iter()
1702                        .map(|file_group| {
1703                            file_group
1704                                .iter()
1705                                .map(|file| {
1706                                    partitioned_files
1707                                        .iter()
1708                                        .find_map(|f| {
1709                                            if f.object_meta == file.object_meta {
1710                                                Some(
1711                                                    f.object_meta
1712                                                        .location
1713                                                        .as_ref()
1714                                                        .rsplit('/')
1715                                                        .next()
1716                                                        .unwrap()
1717                                                        .trim_end_matches(".parquet"),
1718                                                )
1719                                            } else {
1720                                                None
1721                                            }
1722                                        })
1723                                        .unwrap()
1724                                })
1725                                .collect::<Vec<_>>()
1726                        })
1727                        .collect::<Vec<_>>()
1728                })
1729                .map_err(|e| e.strip_backtrace().leak() as &'static str);
1730
1731            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1732        }
1733
1734        return Ok(());
1735
1736        impl From<File> for PartitionedFile {
1737            fn from(file: File) -> Self {
1738                PartitionedFile {
1739                    object_meta: ObjectMeta {
1740                        location: Path::from(format!(
1741                            "data/date={}/{}.parquet",
1742                            file.date, file.name
1743                        )),
1744                        last_modified: chrono::Utc.timestamp_nanos(0),
1745                        size: 0,
1746                        e_tag: None,
1747                        version: None,
1748                    },
1749                    partition_values: vec![ScalarValue::from(file.date)],
1750                    range: None,
1751                    statistics: Some(Arc::new(Statistics {
1752                        num_rows: Precision::Absent,
1753                        total_byte_size: Precision::Absent,
1754                        column_statistics: file
1755                            .statistics
1756                            .into_iter()
1757                            .map(|stats| {
1758                                stats
1759                                    .map(|(min, max)| ColumnStatistics {
1760                                        min_value: Precision::Exact(
1761                                            ScalarValue::Float64(min),
1762                                        ),
1763                                        max_value: Precision::Exact(
1764                                            ScalarValue::Float64(max),
1765                                        ),
1766                                        ..Default::default()
1767                                    })
1768                                    .unwrap_or_default()
1769                            })
1770                            .collect::<Vec<_>>(),
1771                    })),
1772                    extensions: None,
1773                    metadata_size_hint: None,
1774                }
1775            }
1776        }
1777    }
1778
1779    // sets default for configs that play no role in projections
1780    fn config_for_projection(
1781        file_schema: SchemaRef,
1782        projection: Option<Vec<usize>>,
1783        statistics: Statistics,
1784        table_partition_cols: Vec<Field>,
1785    ) -> FileScanConfig {
1786        let table_schema = TableSchema::new(
1787            file_schema,
1788            table_partition_cols.into_iter().map(Arc::new).collect(),
1789        );
1790        FileScanConfigBuilder::new(
1791            ObjectStoreUrl::parse("test:///").unwrap(),
1792            Arc::new(MockSource::new(table_schema.clone())),
1793        )
1794        .with_projection_indices(projection)
1795        .unwrap()
1796        .with_statistics(statistics)
1797        .build()
1798    }
1799
1800    #[test]
1801    fn test_file_scan_config_builder() {
1802        let file_schema = aggr_test_schema();
1803        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1804
1805        let table_schema = TableSchema::new(
1806            Arc::clone(&file_schema),
1807            vec![Arc::new(Field::new(
1808                "date",
1809                wrap_partition_type_in_dict(DataType::Utf8),
1810                false,
1811            ))],
1812        );
1813
1814        let file_source: Arc<dyn FileSource> =
1815            Arc::new(MockSource::new(table_schema.clone()));
1816
1817        // Create a builder with required parameters
1818        let builder = FileScanConfigBuilder::new(
1819            object_store_url.clone(),
1820            Arc::clone(&file_source),
1821        );
1822
1823        // Build with various configurations
1824        let config = builder
1825            .with_limit(Some(1000))
1826            .with_projection_indices(Some(vec![0, 1]))
1827            .unwrap()
1828            .with_statistics(Statistics::new_unknown(&file_schema))
1829            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
1830                "test.parquet".to_string(),
1831                1024,
1832            )])])
1833            .with_output_ordering(vec![
1834                [PhysicalSortExpr::new_default(Arc::new(Column::new(
1835                    "date", 0,
1836                )))]
1837                .into(),
1838            ])
1839            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1840            .build();
1841
1842        // Verify the built config has all the expected values
1843        assert_eq!(config.object_store_url, object_store_url);
1844        assert_eq!(*config.file_schema(), file_schema);
1845        assert_eq!(config.limit, Some(1000));
1846        assert_eq!(
1847            config
1848                .file_source
1849                .projection()
1850                .as_ref()
1851                .map(|p| p.column_indices()),
1852            Some(vec![0, 1])
1853        );
1854        assert_eq!(config.table_partition_cols().len(), 1);
1855        assert_eq!(config.table_partition_cols()[0].name(), "date");
1856        assert_eq!(config.file_groups.len(), 1);
1857        assert_eq!(config.file_groups[0].len(), 1);
1858        assert_eq!(
1859            config.file_groups[0][0].object_meta.location.as_ref(),
1860            "test.parquet"
1861        );
1862        assert_eq!(
1863            config.file_compression_type,
1864            FileCompressionType::UNCOMPRESSED
1865        );
1866        assert_eq!(config.output_ordering.len(), 1);
1867    }
1868
1869    #[test]
1870    fn equivalence_properties_after_schema_change() {
1871        let file_schema = aggr_test_schema();
1872        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1873
1874        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1875
1876        // Create a file source with a filter
1877        let file_source: Arc<dyn FileSource> = Arc::new(
1878            MockSource::new(table_schema.clone()).with_filter(Arc::new(BinaryExpr::new(
1879                col("c2", &file_schema).unwrap(),
1880                Operator::Eq,
1881                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1882            ))),
1883        );
1884
1885        let config = FileScanConfigBuilder::new(
1886            object_store_url.clone(),
1887            Arc::clone(&file_source),
1888        )
1889        .with_projection_indices(Some(vec![0, 1, 2]))
1890        .unwrap()
1891        .build();
1892
1893        // Simulate projection being updated. Since the filter has already been pushed down,
1894        // the new projection won't include the filtered column.
1895        let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
1896            col("c1", &file_schema).unwrap(),
1897            "c1",
1898        )]);
1899        let data_source = config
1900            .try_swapping_with_projection(&exprs)
1901            .unwrap()
1902            .unwrap();
1903
1904        // Gather the equivalence properties from the new data source. There should
1905        // be no equivalence class for column c2 since it was removed by the projection.
1906        let eq_properties = data_source.eq_properties();
1907        let eq_group = eq_properties.eq_group();
1908
1909        for class in eq_group.iter() {
1910            for expr in class.iter() {
1911                if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1912                    assert_ne!(
1913                        col.name(),
1914                        "c2",
1915                        "c2 should not be present in any equivalence class"
1916                    );
1917                }
1918            }
1919        }
1920    }
1921
1922    #[test]
1923    fn test_file_scan_config_builder_defaults() {
1924        let file_schema = aggr_test_schema();
1925        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1926
1927        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1928
1929        let file_source: Arc<dyn FileSource> =
1930            Arc::new(MockSource::new(table_schema.clone()));
1931
1932        // Create a builder with only required parameters and build without any additional configurations
1933        let config = FileScanConfigBuilder::new(
1934            object_store_url.clone(),
1935            Arc::clone(&file_source),
1936        )
1937        .build();
1938
1939        // Verify default values
1940        assert_eq!(config.object_store_url, object_store_url);
1941        assert_eq!(*config.file_schema(), file_schema);
1942        assert_eq!(config.limit, None);
1943        // When no projection is specified, the file source should have an unprojected projection
1944        // (i.e., all columns)
1945        let expected_projection: Vec<usize> = (0..file_schema.fields().len()).collect();
1946        assert_eq!(
1947            config
1948                .file_source
1949                .projection()
1950                .as_ref()
1951                .map(|p| p.column_indices()),
1952            Some(expected_projection)
1953        );
1954        assert!(config.table_partition_cols().is_empty());
1955        assert!(config.file_groups.is_empty());
1956        assert_eq!(
1957            config.file_compression_type,
1958            FileCompressionType::UNCOMPRESSED
1959        );
1960        assert!(config.output_ordering.is_empty());
1961        assert!(config.constraints.is_empty());
1962
1963        // Verify statistics are set to unknown
1964        assert_eq!(config.statistics().num_rows, Precision::Absent);
1965        assert_eq!(config.statistics().total_byte_size, Precision::Absent);
1966        assert_eq!(
1967            config.statistics().column_statistics.len(),
1968            file_schema.fields().len()
1969        );
1970        for stat in config.statistics().column_statistics {
1971            assert_eq!(stat.distinct_count, Precision::Absent);
1972            assert_eq!(stat.min_value, Precision::Absent);
1973            assert_eq!(stat.max_value, Precision::Absent);
1974            assert_eq!(stat.null_count, Precision::Absent);
1975        }
1976    }
1977
1978    #[test]
1979    fn test_file_scan_config_builder_new_from() {
1980        let schema = aggr_test_schema();
1981        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1982        let partition_cols = vec![Field::new(
1983            "date",
1984            wrap_partition_type_in_dict(DataType::Utf8),
1985            false,
1986        )];
1987        let file = PartitionedFile::new("test_file.parquet", 100);
1988
1989        let table_schema = TableSchema::new(
1990            Arc::clone(&schema),
1991            partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
1992        );
1993
1994        let file_source: Arc<dyn FileSource> =
1995            Arc::new(MockSource::new(table_schema.clone()));
1996
1997        // Create a config with non-default values
1998        let original_config = FileScanConfigBuilder::new(
1999            object_store_url.clone(),
2000            Arc::clone(&file_source),
2001        )
2002        .with_projection_indices(Some(vec![0, 2]))
2003        .unwrap()
2004        .with_limit(Some(10))
2005        .with_file(file.clone())
2006        .with_constraints(Constraints::default())
2007        .build();
2008
2009        // Create a new builder from the config
2010        let new_builder = FileScanConfigBuilder::from(original_config);
2011
2012        // Build a new config from this builder
2013        let new_config = new_builder.build();
2014
2015        // Verify properties match
2016        let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2017        assert_eq!(new_config.object_store_url, object_store_url);
2018        assert_eq!(*new_config.file_schema(), schema);
2019        assert_eq!(
2020            new_config
2021                .file_source
2022                .projection()
2023                .as_ref()
2024                .map(|p| p.column_indices()),
2025            Some(vec![0, 2])
2026        );
2027        assert_eq!(new_config.limit, Some(10));
2028        assert_eq!(*new_config.table_partition_cols(), partition_cols);
2029        assert_eq!(new_config.file_groups.len(), 1);
2030        assert_eq!(new_config.file_groups[0].len(), 1);
2031        assert_eq!(
2032            new_config.file_groups[0][0].object_meta.location.as_ref(),
2033            "test_file.parquet"
2034        );
2035        assert_eq!(new_config.constraints, Constraints::default());
2036    }
2037
2038    #[test]
2039    fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2040        use datafusion_common::DFSchema;
2041        use datafusion_expr::{col, execution_props::ExecutionProps};
2042
2043        let schema = Arc::new(Schema::new(vec![Field::new(
2044            "value",
2045            DataType::Float64,
2046            false,
2047        )]));
2048
2049        // Setup sort expression
2050        let exec_props = ExecutionProps::new();
2051        let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2052        let sort_expr = [col("value").sort(true, false)];
2053        let sort_ordering = sort_expr
2054            .map(|expr| {
2055                create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2056            })
2057            .into();
2058
2059        // Test case parameters
2060        struct TestCase {
2061            name: String,
2062            file_count: usize,
2063            overlap_factor: f64,
2064            target_partitions: usize,
2065            expected_partition_count: usize,
2066        }
2067
2068        let test_cases = vec![
2069            // Basic cases
2070            TestCase {
2071                name: "no_overlap_10_files_4_partitions".to_string(),
2072                file_count: 10,
2073                overlap_factor: 0.0,
2074                target_partitions: 4,
2075                expected_partition_count: 4,
2076            },
2077            TestCase {
2078                name: "medium_overlap_20_files_5_partitions".to_string(),
2079                file_count: 20,
2080                overlap_factor: 0.5,
2081                target_partitions: 5,
2082                expected_partition_count: 5,
2083            },
2084            TestCase {
2085                name: "high_overlap_30_files_3_partitions".to_string(),
2086                file_count: 30,
2087                overlap_factor: 0.8,
2088                target_partitions: 3,
2089                expected_partition_count: 7,
2090            },
2091            // Edge cases
2092            TestCase {
2093                name: "fewer_files_than_partitions".to_string(),
2094                file_count: 3,
2095                overlap_factor: 0.0,
2096                target_partitions: 10,
2097                expected_partition_count: 3, // Should only create as many partitions as files
2098            },
2099            TestCase {
2100                name: "single_file".to_string(),
2101                file_count: 1,
2102                overlap_factor: 0.0,
2103                target_partitions: 5,
2104                expected_partition_count: 1, // Should create only one partition
2105            },
2106            TestCase {
2107                name: "empty_files".to_string(),
2108                file_count: 0,
2109                overlap_factor: 0.0,
2110                target_partitions: 3,
2111                expected_partition_count: 0, // Empty result for empty input
2112            },
2113        ];
2114
2115        for case in test_cases {
2116            println!("Running test case: {}", case.name);
2117
2118            // Generate files using bench utility function
2119            let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2120
2121            // Call the function under test
2122            let result =
2123                FileScanConfig::split_groups_by_statistics_with_target_partitions(
2124                    &schema,
2125                    &file_groups,
2126                    &sort_ordering,
2127                    case.target_partitions,
2128                )?;
2129
2130            // Verify results
2131            println!(
2132                "Created {} partitions (target was {})",
2133                result.len(),
2134                case.target_partitions
2135            );
2136
2137            // Check partition count
2138            assert_eq!(
2139                result.len(),
2140                case.expected_partition_count,
2141                "Case '{}': Unexpected partition count",
2142                case.name
2143            );
2144
2145            // Verify sort integrity
2146            assert!(
2147                verify_sort_integrity(&result),
2148                "Case '{}': Files within partitions are not properly ordered",
2149                case.name
2150            );
2151
2152            // Distribution check for partitions
2153            if case.file_count > 1 && case.expected_partition_count > 1 {
2154                let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2155                let max_size = *group_sizes.iter().max().unwrap();
2156                let min_size = *group_sizes.iter().min().unwrap();
2157
2158                // Check partition balancing - difference shouldn't be extreme
2159                let avg_files_per_partition =
2160                    case.file_count as f64 / case.expected_partition_count as f64;
2161                assert!(
2162                    (max_size as f64) < 2.0 * avg_files_per_partition,
2163                    "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2164                    case.name,
2165                    max_size,
2166                    avg_files_per_partition
2167                );
2168
2169                println!("Distribution - min files: {min_size}, max files: {max_size}");
2170            }
2171        }
2172
2173        // Test error case: zero target partitions
2174        let empty_groups: Vec<FileGroup> = vec![];
2175        let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2176            &schema,
2177            &empty_groups,
2178            &sort_ordering,
2179            0,
2180        )
2181        .unwrap_err();
2182
2183        assert!(
2184            err.to_string()
2185                .contains("target_partitions must be greater than 0"),
2186            "Expected error for zero target partitions"
2187        );
2188
2189        Ok(())
2190    }
2191
2192    #[test]
2193    fn test_partition_statistics_projection() {
2194        // This test verifies that partition_statistics applies projection correctly.
2195        // The old implementation had a bug where it returned file group statistics
2196        // without applying the projection, returning all column statistics instead
2197        // of just the projected ones.
2198
2199        use crate::source::DataSourceExec;
2200        use datafusion_physical_plan::ExecutionPlan;
2201
2202        // Create a schema with 4 columns
2203        let schema = Arc::new(Schema::new(vec![
2204            Field::new("col0", DataType::Int32, false),
2205            Field::new("col1", DataType::Int32, false),
2206            Field::new("col2", DataType::Int32, false),
2207            Field::new("col3", DataType::Int32, false),
2208        ]));
2209
2210        // Create statistics for all 4 columns
2211        let file_group_stats = Statistics {
2212            num_rows: Precision::Exact(100),
2213            total_byte_size: Precision::Exact(1024),
2214            column_statistics: vec![
2215                ColumnStatistics {
2216                    null_count: Precision::Exact(0),
2217                    ..ColumnStatistics::new_unknown()
2218                },
2219                ColumnStatistics {
2220                    null_count: Precision::Exact(5),
2221                    ..ColumnStatistics::new_unknown()
2222                },
2223                ColumnStatistics {
2224                    null_count: Precision::Exact(10),
2225                    ..ColumnStatistics::new_unknown()
2226                },
2227                ColumnStatistics {
2228                    null_count: Precision::Exact(15),
2229                    ..ColumnStatistics::new_unknown()
2230                },
2231            ],
2232        };
2233
2234        // Create a file group with statistics
2235        let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2236            .with_statistics(Arc::new(file_group_stats));
2237
2238        let table_schema = TableSchema::new(Arc::clone(&schema), vec![]);
2239
2240        // Create a FileScanConfig with projection: only keep columns 0 and 2
2241        let config = FileScanConfigBuilder::new(
2242            ObjectStoreUrl::parse("test:///").unwrap(),
2243            Arc::new(MockSource::new(table_schema.clone())),
2244        )
2245        .with_projection_indices(Some(vec![0, 2]))
2246        .unwrap() // Only project columns 0 and 2
2247        .with_file_groups(vec![file_group])
2248        .build();
2249
2250        // Create a DataSourceExec from the config
2251        let exec = DataSourceExec::from_data_source(config);
2252
2253        // Get statistics for partition 0
2254        let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2255
2256        // Verify that only 2 columns are in the statistics (the projected ones)
2257        assert_eq!(
2258            partition_stats.column_statistics.len(),
2259            2,
2260            "Expected 2 column statistics (projected), but got {}",
2261            partition_stats.column_statistics.len()
2262        );
2263
2264        // Verify the column statistics are for columns 0 and 2
2265        assert_eq!(
2266            partition_stats.column_statistics[0].null_count,
2267            Precision::Exact(0),
2268            "First projected column should be col0 with 0 nulls"
2269        );
2270        assert_eq!(
2271            partition_stats.column_statistics[1].null_count,
2272            Precision::Exact(10),
2273            "Second projected column should be col2 with 10 nulls"
2274        );
2275
2276        // Verify row count and byte size
2277        assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2278        assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
2279    }
2280
2281    #[test]
2282    fn test_output_partitioning_not_partitioned_by_file_group() {
2283        let file_schema = aggr_test_schema();
2284        let partition_col =
2285            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), false);
2286
2287        let config = config_for_projection(
2288            Arc::clone(&file_schema),
2289            None,
2290            Statistics::new_unknown(&file_schema),
2291            vec![partition_col],
2292        );
2293
2294        // partitioned_by_file_group defaults to false
2295        let partitioning = config.output_partitioning();
2296        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2297    }
2298
2299    #[test]
2300    fn test_output_partitioning_no_partition_columns() {
2301        let file_schema = aggr_test_schema();
2302        let mut config = config_for_projection(
2303            Arc::clone(&file_schema),
2304            None,
2305            Statistics::new_unknown(&file_schema),
2306            vec![], // No partition columns
2307        );
2308        config.partitioned_by_file_group = true;
2309
2310        let partitioning = config.output_partitioning();
2311        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2312    }
2313
2314    #[test]
2315    fn test_output_partitioning_with_partition_columns() {
2316        let file_schema = aggr_test_schema();
2317
2318        // Test single partition column
2319        let single_partition_col = vec![Field::new(
2320            "date",
2321            wrap_partition_type_in_dict(DataType::Utf8),
2322            false,
2323        )];
2324
2325        let mut config = config_for_projection(
2326            Arc::clone(&file_schema),
2327            None,
2328            Statistics::new_unknown(&file_schema),
2329            single_partition_col,
2330        );
2331        config.partitioned_by_file_group = true;
2332        config.file_groups = vec![
2333            FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2334            FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2335            FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
2336        ];
2337
2338        let partitioning = config.output_partitioning();
2339        match partitioning {
2340            Partitioning::Hash(exprs, num_partitions) => {
2341                assert_eq!(num_partitions, 3);
2342                assert_eq!(exprs.len(), 1);
2343                assert_eq!(
2344                    exprs[0].as_any().downcast_ref::<Column>().unwrap().name(),
2345                    "date"
2346                );
2347            }
2348            _ => panic!("Expected Hash partitioning"),
2349        }
2350
2351        // Test multiple partition columns
2352        let multiple_partition_cols = vec![
2353            Field::new("year", wrap_partition_type_in_dict(DataType::Utf8), false),
2354            Field::new("month", wrap_partition_type_in_dict(DataType::Utf8), false),
2355        ];
2356
2357        config = config_for_projection(
2358            Arc::clone(&file_schema),
2359            None,
2360            Statistics::new_unknown(&file_schema),
2361            multiple_partition_cols,
2362        );
2363        config.partitioned_by_file_group = true;
2364        config.file_groups = vec![
2365            FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2366            FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2367        ];
2368
2369        let partitioning = config.output_partitioning();
2370        match partitioning {
2371            Partitioning::Hash(exprs, num_partitions) => {
2372                assert_eq!(num_partitions, 2);
2373                assert_eq!(exprs.len(), 2);
2374                let col_names: Vec<_> = exprs
2375                    .iter()
2376                    .map(|e| e.as_any().downcast_ref::<Column>().unwrap().name())
2377                    .collect();
2378                assert_eq!(col_names, vec!["year", "month"]);
2379            }
2380            _ => panic!("Expected Hash partitioning"),
2381        }
2382    }
2383}