datafusion_datasource/
file_scan_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21use crate::file_groups::FileGroup;
22use crate::{
23    PartitionedFile, display::FileGroupsDisplay, file::FileSource,
24    file_compression_type::FileCompressionType, file_stream::FileStream,
25    source::DataSource, statistics::MinMaxStatistics,
26};
27use arrow::datatypes::FieldRef;
28use arrow::datatypes::{DataType, Schema, SchemaRef};
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::{
31    Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
32};
33use datafusion_execution::{
34    SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
35};
36use datafusion_expr::Operator;
37
38use datafusion_physical_expr::equivalence::project_orderings;
39use datafusion_physical_expr::expressions::{BinaryExpr, Column};
40use datafusion_physical_expr::projection::ProjectionExprs;
41use datafusion_physical_expr::utils::reassign_expr_columns;
42use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
43use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
44use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
45use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
46use datafusion_physical_plan::SortOrderPushdownResult;
47use datafusion_physical_plan::coop::cooperative;
48use datafusion_physical_plan::execution_plan::SchedulingType;
49use datafusion_physical_plan::{
50    DisplayAs, DisplayFormatType,
51    display::{ProjectSchemaDisplay, display_orderings},
52    filter_pushdown::FilterPushdownPropagation,
53    metrics::ExecutionPlanMetricsSet,
54};
55use log::{debug, warn};
56use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
57
58/// The base configurations for a [`DataSourceExec`], the a physical plan for
59/// any given file format.
60///
61/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
62///
63/// # Example
64/// ```
65/// # use std::any::Any;
66/// # use std::sync::Arc;
67/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
68/// # use object_store::ObjectStore;
69/// # use datafusion_common::Result;
70/// # use datafusion_datasource::file::FileSource;
71/// # use datafusion_datasource::file_groups::FileGroup;
72/// # use datafusion_datasource::PartitionedFile;
73/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
74/// # use datafusion_datasource::file_stream::FileOpener;
75/// # use datafusion_datasource::source::DataSourceExec;
76/// # use datafusion_datasource::table_schema::TableSchema;
77/// # use datafusion_execution::object_store::ObjectStoreUrl;
78/// # use datafusion_physical_expr::projection::ProjectionExprs;
79/// # use datafusion_physical_plan::ExecutionPlan;
80/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
81/// # let file_schema = Arc::new(Schema::new(vec![
82/// #  Field::new("c1", DataType::Int32, false),
83/// #  Field::new("c2", DataType::Int32, false),
84/// #  Field::new("c3", DataType::Int32, false),
85/// #  Field::new("c4", DataType::Int32, false),
86/// # ]));
87/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
88/// #[derive(Clone)]
89/// # struct ParquetSource {
90/// #    table_schema: TableSchema,
91/// # };
92/// # impl FileSource for ParquetSource {
93/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Result<Arc<dyn FileOpener>> { unimplemented!() }
94/// #  fn as_any(&self) -> &dyn Any { self  }
95/// #  fn table_schema(&self) -> &TableSchema { &self.table_schema }
96/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
97/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
98/// #  fn file_type(&self) -> &str { "parquet" }
99/// #  // Note that this implementation drops the projection on the floor, it is not complete!
100/// #  fn try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result<Option<Arc<dyn FileSource>>> { Ok(Some(Arc::new(self.clone()) as Arc<dyn FileSource>)) }
101/// #  }
102/// # impl ParquetSource {
103/// #  fn new(table_schema: impl Into<TableSchema>) -> Self { Self {table_schema: table_schema.into()} }
104/// # }
105/// // create FileScan config for reading parquet files from file://
106/// let object_store_url = ObjectStoreUrl::local_filesystem();
107/// let file_source = Arc::new(ParquetSource::new(file_schema.clone()));
108/// let config = FileScanConfigBuilder::new(object_store_url, file_source)
109///   .with_limit(Some(1000))            // read only the first 1000 records
110///   .with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3
111///   .expect("Failed to push down projection")
112///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
113///   .with_file(PartitionedFile::new("file1.parquet", 1234))
114///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
115///   // in a  single row group
116///   .with_file_group(FileGroup::new(vec![
117///    PartitionedFile::new("file2.parquet", 56),
118///    PartitionedFile::new("file3.parquet", 78),
119///   ])).build();
120/// // create an execution plan from the config
121/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
122/// ```
123///
124/// [`DataSourceExec`]: crate::source::DataSourceExec
125/// [`DataSourceExec::from_data_source`]: crate::source::DataSourceExec::from_data_source
126#[derive(Clone)]
127pub struct FileScanConfig {
128    /// Object store URL, used to get an [`ObjectStore`] instance from
129    /// [`RuntimeEnv::object_store`]
130    ///
131    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
132    /// as `file://` or `s3://my_bucket`. It should not include the path to the
133    /// file itself. The relevant URL prefix must be registered via
134    /// [`RuntimeEnv::register_object_store`]
135    ///
136    /// [`ObjectStore`]: object_store::ObjectStore
137    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
138    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
139    pub object_store_url: ObjectStoreUrl,
140    /// List of files to be processed, grouped into partitions
141    ///
142    /// Each file must have a schema of `file_schema` or a subset. If
143    /// a particular file has a subset, the missing columns are
144    /// padded with NULLs.
145    ///
146    /// DataFusion may attempt to read each partition of files
147    /// concurrently, however files *within* a partition will be read
148    /// sequentially, one after the next.
149    pub file_groups: Vec<FileGroup>,
150    /// Table constraints
151    pub constraints: Constraints,
152    /// The maximum number of records to read from this plan. If `None`,
153    /// all records after filtering are returned.
154    pub limit: Option<usize>,
155    /// All equivalent lexicographical orderings that describe the schema.
156    pub output_ordering: Vec<LexOrdering>,
157    /// File compression type
158    pub file_compression_type: FileCompressionType,
159    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
160    pub file_source: Arc<dyn FileSource>,
161    /// Batch size while creating new batches
162    /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
163    pub batch_size: Option<usize>,
164    /// Expression adapter used to adapt filters and projections that are pushed down into the scan
165    /// from the logical schema to the physical schema of the file.
166    pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
167    /// Unprojected statistics for the table (file schema + partition columns).
168    /// These are projected on-demand via `projected_stats()`.
169    ///
170    /// Note that this field is pub(crate) because accessing it directly from outside
171    /// would be incorrect if there are filters being applied, thus this should be accessed
172    /// via [`FileScanConfig::statistics`].
173    pub(crate) statistics: Statistics,
174    /// When true, file_groups are organized by partition column values
175    /// and output_partitioning will return Hash partitioning on partition columns.
176    /// This allows the optimizer to skip hash repartitioning for aggregates and joins
177    /// on partition columns.
178    ///
179    /// If the number of file partitions > target_partitions, the file partitions will be grouped
180    /// in a round-robin fashion such that number of file partitions = target_partitions.
181    pub partitioned_by_file_group: bool,
182}
183
184/// A builder for [`FileScanConfig`]'s.
185///
186/// Example:
187///
188/// ```rust
189/// # use std::sync::Arc;
190/// # use arrow::datatypes::{DataType, Field, Schema};
191/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig};
192/// # use datafusion_datasource::file_compression_type::FileCompressionType;
193/// # use datafusion_datasource::file_groups::FileGroup;
194/// # use datafusion_datasource::PartitionedFile;
195/// # use datafusion_datasource::table_schema::TableSchema;
196/// # use datafusion_execution::object_store::ObjectStoreUrl;
197/// # use datafusion_common::Statistics;
198/// # use datafusion_datasource::file::FileSource;
199///
200/// # fn main() {
201/// # fn with_source(file_source: Arc<dyn FileSource>) {
202///     // Create a schema for our Parquet files
203///     let file_schema = Arc::new(Schema::new(vec![
204///         Field::new("id", DataType::Int32, false),
205///         Field::new("value", DataType::Utf8, false),
206///     ]));
207///
208///     // Create partition columns
209///     let partition_cols = vec![
210///         Arc::new(Field::new("date", DataType::Utf8, false)),
211///     ];
212///
213///     // Create table schema with file schema and partition columns
214///     let table_schema = TableSchema::new(file_schema, partition_cols);
215///
216///     // Create a builder for scanning Parquet files from a local filesystem
217///     let config = FileScanConfigBuilder::new(
218///         ObjectStoreUrl::local_filesystem(),
219///         file_source,
220///     )
221///     // Set a limit of 1000 rows
222///     .with_limit(Some(1000))
223///     // Project only the first column
224///     .with_projection_indices(Some(vec![0]))
225///     .expect("Failed to push down projection")
226///     // Add a file group with two files
227///     .with_file_group(FileGroup::new(vec![
228///         PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024),
229///         PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048),
230///     ]))
231///     // Set compression type
232///     .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
233///     // Build the final config
234///     .build();
235/// # }
236/// # }
237/// ```
238#[derive(Clone)]
239pub struct FileScanConfigBuilder {
240    object_store_url: ObjectStoreUrl,
241    file_source: Arc<dyn FileSource>,
242    limit: Option<usize>,
243    constraints: Option<Constraints>,
244    file_groups: Vec<FileGroup>,
245    statistics: Option<Statistics>,
246    output_ordering: Vec<LexOrdering>,
247    file_compression_type: Option<FileCompressionType>,
248    batch_size: Option<usize>,
249    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
250    partitioned_by_file_group: bool,
251}
252
253impl FileScanConfigBuilder {
254    /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files.
255    ///
256    /// # Parameters:
257    /// * `object_store_url`: See [`FileScanConfig::object_store_url`]
258    /// * `file_source`: See [`FileScanConfig::file_source`]. The file source must have
259    ///   a schema set via its constructor.
260    pub fn new(
261        object_store_url: ObjectStoreUrl,
262        file_source: Arc<dyn FileSource>,
263    ) -> Self {
264        Self {
265            object_store_url,
266            file_source,
267            file_groups: vec![],
268            statistics: None,
269            output_ordering: vec![],
270            file_compression_type: None,
271            limit: None,
272            constraints: None,
273            batch_size: None,
274            expr_adapter_factory: None,
275            partitioned_by_file_group: false,
276        }
277    }
278
279    /// Set the maximum number of records to read from this plan. If `None`,
280    /// all records after filtering are returned.
281    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
282        self.limit = limit;
283        self
284    }
285
286    /// Set the file source for scanning files.
287    ///
288    /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
289    /// after the builder has been created.
290    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
291        self.file_source = file_source;
292        self
293    }
294
295    pub fn table_schema(&self) -> &SchemaRef {
296        self.file_source.table_schema().table_schema()
297    }
298
299    /// Set the columns on which to project the data. Indexes that are higher than the
300    /// number of columns of `file_schema` refer to `table_partition_cols`.
301    ///
302    /// # Deprecated
303    /// Use [`Self::with_projection_indices`] instead. This method will be removed in a future release.
304    #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
305    pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
306        match self.clone().with_projection_indices(indices) {
307            Ok(builder) => builder,
308            Err(e) => {
309                warn!(
310                    "Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
311                );
312                self
313            }
314        }
315    }
316
317    /// Set the columns on which to project the data using column indices.
318    ///
319    /// Indexes that are higher than the number of columns of `file_schema` refer to `table_partition_cols`.
320    pub fn with_projection_indices(
321        mut self,
322        indices: Option<Vec<usize>>,
323    ) -> Result<Self> {
324        let projection_exprs = indices.map(|indices| {
325            ProjectionExprs::from_indices(
326                &indices,
327                self.file_source.table_schema().table_schema(),
328            )
329        });
330        let Some(projection_exprs) = projection_exprs else {
331            return Ok(self);
332        };
333        let new_source = self
334            .file_source
335            .try_pushdown_projection(&projection_exprs)
336            .map_err(|e| {
337                internal_datafusion_err!(
338                    "Failed to push down projection in FileScanConfigBuilder::build: {e}"
339                )
340            })?;
341        if let Some(new_source) = new_source {
342            self.file_source = new_source;
343        } else {
344            internal_err!(
345                "FileSource {} does not support projection pushdown",
346                self.file_source.file_type()
347            )?;
348        }
349        Ok(self)
350    }
351
352    /// Set the table constraints
353    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
354        self.constraints = Some(constraints);
355        self
356    }
357
358    /// Set the estimated overall statistics of the files, taking `filters` into account.
359    /// Defaults to [`Statistics::new_unknown`].
360    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
361        self.statistics = Some(statistics);
362        self
363    }
364
365    /// Set the list of files to be processed, grouped into partitions.
366    ///
367    /// Each file must have a schema of `file_schema` or a subset. If
368    /// a particular file has a subset, the missing columns are
369    /// padded with NULLs.
370    ///
371    /// DataFusion may attempt to read each partition of files
372    /// concurrently, however files *within* a partition will be read
373    /// sequentially, one after the next.
374    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
375        self.file_groups = file_groups;
376        self
377    }
378
379    /// Add a new file group
380    ///
381    /// See [`Self::with_file_groups`] for more information
382    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
383        self.file_groups.push(file_group);
384        self
385    }
386
387    /// Add a file as a single group
388    ///
389    /// See [`Self::with_file_groups`] for more information.
390    pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
391        self.with_file_group(FileGroup::new(vec![partitioned_file]))
392    }
393
394    /// Set the output ordering of the files
395    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
396        self.output_ordering = output_ordering;
397        self
398    }
399
400    /// Set the file compression type
401    pub fn with_file_compression_type(
402        mut self,
403        file_compression_type: FileCompressionType,
404    ) -> Self {
405        self.file_compression_type = Some(file_compression_type);
406        self
407    }
408
409    /// Set the batch_size property
410    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
411        self.batch_size = batch_size;
412        self
413    }
414
415    /// Register an expression adapter used to adapt filters and projections that are pushed down into the scan
416    /// from the logical schema to the physical schema of the file.
417    /// This can include things like:
418    /// - Column ordering changes
419    /// - Handling of missing columns
420    /// - Rewriting expression to use pre-computed values or file format specific optimizations
421    pub fn with_expr_adapter(
422        mut self,
423        expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
424    ) -> Self {
425        self.expr_adapter_factory = expr_adapter;
426        self
427    }
428
429    /// Set whether file groups are organized by partition column values.
430    ///
431    /// When set to true, the output partitioning will be declared as Hash partitioning
432    /// on the partition columns.
433    pub fn with_partitioned_by_file_group(
434        mut self,
435        partitioned_by_file_group: bool,
436    ) -> Self {
437        self.partitioned_by_file_group = partitioned_by_file_group;
438        self
439    }
440
441    /// Build the final [`FileScanConfig`] with all the configured settings.
442    ///
443    /// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
444    /// Any unset optional fields will use their default values.
445    ///
446    /// # Errors
447    /// Returns an error if projection pushdown fails or if schema operations fail.
448    pub fn build(self) -> FileScanConfig {
449        let Self {
450            object_store_url,
451            file_source,
452            limit,
453            constraints,
454            file_groups,
455            statistics,
456            output_ordering,
457            file_compression_type,
458            batch_size,
459            expr_adapter_factory: expr_adapter,
460            partitioned_by_file_group,
461        } = self;
462
463        let constraints = constraints.unwrap_or_default();
464        let statistics = statistics.unwrap_or_else(|| {
465            Statistics::new_unknown(file_source.table_schema().table_schema())
466        });
467        let file_compression_type =
468            file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
469
470        FileScanConfig {
471            object_store_url,
472            file_source,
473            limit,
474            constraints,
475            file_groups,
476            output_ordering,
477            file_compression_type,
478            batch_size,
479            expr_adapter_factory: expr_adapter,
480            statistics,
481            partitioned_by_file_group,
482        }
483    }
484}
485
486impl From<FileScanConfig> for FileScanConfigBuilder {
487    fn from(config: FileScanConfig) -> Self {
488        Self {
489            object_store_url: config.object_store_url,
490            file_source: Arc::<dyn FileSource>::clone(&config.file_source),
491            file_groups: config.file_groups,
492            statistics: Some(config.statistics),
493            output_ordering: config.output_ordering,
494            file_compression_type: Some(config.file_compression_type),
495            limit: config.limit,
496            constraints: Some(config.constraints),
497            batch_size: config.batch_size,
498            expr_adapter_factory: config.expr_adapter_factory,
499            partitioned_by_file_group: config.partitioned_by_file_group,
500        }
501    }
502}
503
504impl DataSource for FileScanConfig {
505    fn open(
506        &self,
507        partition: usize,
508        context: Arc<TaskContext>,
509    ) -> Result<SendableRecordBatchStream> {
510        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
511        let batch_size = self
512            .batch_size
513            .unwrap_or_else(|| context.session_config().batch_size());
514
515        let source = self.file_source.with_batch_size(batch_size);
516
517        let opener = source.create_file_opener(object_store, self, partition)?;
518
519        let stream = FileStream::new(self, partition, opener, source.metrics())?;
520        Ok(Box::pin(cooperative(stream)))
521    }
522
523    fn as_any(&self) -> &dyn Any {
524        self
525    }
526
527    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
528        match t {
529            DisplayFormatType::Default | DisplayFormatType::Verbose => {
530                let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
531                let orderings = get_projected_output_ordering(self, &schema);
532
533                write!(f, "file_groups=")?;
534                FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
535
536                if !schema.fields().is_empty() {
537                    if let Some(projection) = self.file_source.projection() {
538                        // This matches what ProjectionExec does.
539                        // TODO: can we put this into ProjectionExprs so that it's shared code?
540                        let expr: Vec<String> = projection
541                            .as_ref()
542                            .iter()
543                            .map(|proj_expr| {
544                                if let Some(column) =
545                                    proj_expr.expr.as_any().downcast_ref::<Column>()
546                                {
547                                    if column.name() == proj_expr.alias {
548                                        column.name().to_string()
549                                    } else {
550                                        format!(
551                                            "{} as {}",
552                                            proj_expr.expr, proj_expr.alias
553                                        )
554                                    }
555                                } else {
556                                    format!("{} as {}", proj_expr.expr, proj_expr.alias)
557                                }
558                            })
559                            .collect();
560                        write!(f, ", projection=[{}]", expr.join(", "))?;
561                    } else {
562                        write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
563                    }
564                }
565
566                if let Some(limit) = self.limit {
567                    write!(f, ", limit={limit}")?;
568                }
569
570                display_orderings(f, &orderings)?;
571
572                if !self.constraints.is_empty() {
573                    write!(f, ", {}", self.constraints)?;
574                }
575
576                self.fmt_file_source(t, f)
577            }
578            DisplayFormatType::TreeRender => {
579                writeln!(f, "format={}", self.file_source.file_type())?;
580                self.file_source.fmt_extra(t, f)?;
581                let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
582                writeln!(f, "files={num_files}")?;
583                Ok(())
584            }
585        }
586    }
587
588    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
589    fn repartitioned(
590        &self,
591        target_partitions: usize,
592        repartition_file_min_size: usize,
593        output_ordering: Option<LexOrdering>,
594    ) -> Result<Option<Arc<dyn DataSource>>> {
595        // When files are grouped by partition values, we cannot allow byte-range
596        // splitting. It would mix rows from different partition values across
597        // file groups, breaking the Hash partitioning.
598        if self.partitioned_by_file_group {
599            return Ok(None);
600        }
601
602        let source = self.file_source.repartitioned(
603            target_partitions,
604            repartition_file_min_size,
605            output_ordering,
606            self,
607        )?;
608
609        Ok(source.map(|s| Arc::new(s) as _))
610    }
611
612    /// Returns the output partitioning for this file scan.
613    ///
614    /// When `partitioned_by_file_group` is true, this returns `Partitioning::Hash` on
615    /// the Hive partition columns, allowing the optimizer to skip hash repartitioning
616    /// for aggregates and joins on those columns.
617    ///
618    /// Tradeoffs
619    /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with
620    ///   `GROUP BY` or `ORDER BY` on partition columns.
621    /// - Cost: Files are grouped by partition values rather than split by byte
622    ///   ranges, which may reduce I/O parallelism when partition sizes are uneven.
623    ///   For simple aggregations without `ORDER BY`, this cost may outweigh the benefit.
624    ///
625    /// Follow-up Work
626    /// - Idea: Could allow byte-range splitting within partition-aware groups,
627    ///   preserving I/O parallelism while maintaining partition semantics.
628    fn output_partitioning(&self) -> Partitioning {
629        if self.partitioned_by_file_group {
630            let partition_cols = self.table_partition_cols();
631            if !partition_cols.is_empty() {
632                let projected_schema = match self.projected_schema() {
633                    Ok(schema) => schema,
634                    Err(_) => {
635                        debug!(
636                            "Could not get projected schema, falling back to UnknownPartitioning."
637                        );
638                        return Partitioning::UnknownPartitioning(self.file_groups.len());
639                    }
640                };
641
642                // Build Column expressions for partition columns based on their
643                // position in the projected schema
644                let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
645                for partition_col in partition_cols {
646                    if let Some((idx, _)) = projected_schema
647                        .fields()
648                        .iter()
649                        .enumerate()
650                        .find(|(_, f)| f.name() == partition_col.name())
651                    {
652                        exprs.push(Arc::new(Column::new(partition_col.name(), idx)));
653                    }
654                }
655
656                if exprs.len() == partition_cols.len() {
657                    return Partitioning::Hash(exprs, self.file_groups.len());
658                }
659            }
660        }
661        Partitioning::UnknownPartitioning(self.file_groups.len())
662    }
663
664    fn eq_properties(&self) -> EquivalenceProperties {
665        let schema = self.file_source.table_schema().table_schema();
666        let mut eq_properties = EquivalenceProperties::new_with_orderings(
667            Arc::clone(schema),
668            self.output_ordering.clone(),
669        )
670        .with_constraints(self.constraints.clone());
671
672        if let Some(filter) = self.file_source.filter() {
673            // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
674            // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
675            match Self::add_filter_equivalence_info(&filter, &mut eq_properties, schema) {
676                Ok(()) => {}
677                Err(e) => {
678                    warn!("Failed to add filter equivalence info: {e}");
679                    #[cfg(debug_assertions)]
680                    panic!("Failed to add filter equivalence info: {e}");
681                }
682            }
683        }
684
685        if let Some(projection) = self.file_source.projection() {
686            match (
687                projection.project_schema(schema),
688                projection.projection_mapping(schema),
689            ) {
690                (Ok(output_schema), Ok(mapping)) => {
691                    eq_properties =
692                        eq_properties.project(&mapping, Arc::new(output_schema));
693                }
694                (Err(e), _) | (_, Err(e)) => {
695                    warn!("Failed to project equivalence properties: {e}");
696                    #[cfg(debug_assertions)]
697                    panic!("Failed to project equivalence properties: {e}");
698                }
699            }
700        }
701
702        eq_properties
703    }
704
705    fn scheduling_type(&self) -> SchedulingType {
706        SchedulingType::Cooperative
707    }
708
709    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
710        if let Some(partition) = partition {
711            // Get statistics for a specific partition
712            // Note: FileGroup statistics include partition columns (computed from partition_values)
713            if let Some(file_group) = self.file_groups.get(partition)
714                && let Some(stat) = file_group.file_statistics(None)
715            {
716                // Project the statistics based on the projection
717                let output_schema = self.projected_schema()?;
718                return if let Some(projection) = self.file_source.projection() {
719                    projection.project_statistics(stat.clone(), &output_schema)
720                } else {
721                    Ok(stat.clone())
722                };
723            }
724            // If no statistics available for this partition, return unknown
725            Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
726        } else {
727            // Return aggregate statistics across all partitions
728            let statistics = self.statistics();
729            let projection = self.file_source.projection();
730            let output_schema = self.projected_schema()?;
731            if let Some(projection) = &projection {
732                projection.project_statistics(statistics.clone(), &output_schema)
733            } else {
734                Ok(statistics)
735            }
736        }
737    }
738
739    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
740        let source = FileScanConfigBuilder::from(self.clone())
741            .with_limit(limit)
742            .build();
743        Some(Arc::new(source))
744    }
745
746    fn fetch(&self) -> Option<usize> {
747        self.limit
748    }
749
750    fn metrics(&self) -> ExecutionPlanMetricsSet {
751        self.file_source.metrics().clone()
752    }
753
754    fn try_swapping_with_projection(
755        &self,
756        projection: &ProjectionExprs,
757    ) -> Result<Option<Arc<dyn DataSource>>> {
758        match self.file_source.try_pushdown_projection(projection)? {
759            Some(new_source) => {
760                let mut new_file_scan_config = self.clone();
761                new_file_scan_config.file_source = new_source;
762                Ok(Some(Arc::new(new_file_scan_config) as Arc<dyn DataSource>))
763            }
764            None => Ok(None),
765        }
766    }
767
768    fn try_pushdown_filters(
769        &self,
770        filters: Vec<Arc<dyn PhysicalExpr>>,
771        config: &ConfigOptions,
772    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
773        // Remap filter Column indices to match the table schema (file + partition columns).
774        // This is necessary because filters may have been created against a different schema
775        // (e.g., after projection pushdown) and need to be remapped to the table schema
776        // before being passed to the file source and ultimately serialized.
777        // For example, the filter being pushed down is `c1_c2 > 5` and it was created
778        // against the output schema of the this `DataSource` which has projection `c1 + c2 as c1_c2`.
779        // Thus we need to rewrite the filter back to `c1 + c2 > 5` before passing it to the file source.
780        let table_schema = self.file_source.table_schema().table_schema();
781        // If there's a projection with aliases, first map the filters back through
782        // the projection expressions before remapping to the table schema.
783        let filters_to_remap = if let Some(projection) = self.file_source.projection() {
784            use datafusion_physical_plan::projection::update_expr;
785            filters
786                .into_iter()
787                .map(|filter| {
788                    update_expr(&filter, projection.as_ref(), true)?.ok_or_else(|| {
789                        internal_datafusion_err!(
790                            "Failed to map filter expression through projection: {}",
791                            filter
792                        )
793                    })
794                })
795                .collect::<Result<Vec<_>>>()?
796        } else {
797            filters
798        };
799        // Now remap column indices to match the table schema.
800        let remapped_filters: Result<Vec<_>> = filters_to_remap
801            .into_iter()
802            .map(|filter| reassign_expr_columns(filter, table_schema.as_ref()))
803            .collect();
804        let remapped_filters = remapped_filters?;
805
806        let result = self
807            .file_source
808            .try_pushdown_filters(remapped_filters, config)?;
809        match result.updated_node {
810            Some(new_file_source) => {
811                let mut new_file_scan_config = self.clone();
812                new_file_scan_config.file_source = new_file_source;
813                Ok(FilterPushdownPropagation {
814                    filters: result.filters,
815                    updated_node: Some(Arc::new(new_file_scan_config) as _),
816                })
817            }
818            None => {
819                // If the file source does not support filter pushdown, return the original config
820                Ok(FilterPushdownPropagation {
821                    filters: result.filters,
822                    updated_node: None,
823                })
824            }
825        }
826    }
827
828    fn try_pushdown_sort(
829        &self,
830        order: &[PhysicalSortExpr],
831    ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
832        // Delegate to FileSource to check if reverse scanning can satisfy the request.
833        let pushdown_result = self
834            .file_source
835            .try_reverse_output(order, &self.eq_properties())?;
836
837        match pushdown_result {
838            SortOrderPushdownResult::Exact { inner } => {
839                Ok(SortOrderPushdownResult::Exact {
840                    inner: self.rebuild_with_source(inner, true)?,
841                })
842            }
843            SortOrderPushdownResult::Inexact { inner } => {
844                Ok(SortOrderPushdownResult::Inexact {
845                    inner: self.rebuild_with_source(inner, false)?,
846                })
847            }
848            SortOrderPushdownResult::Unsupported => {
849                Ok(SortOrderPushdownResult::Unsupported)
850            }
851        }
852    }
853}
854
855impl FileScanConfig {
856    /// Get the file schema (schema of the files without partition columns)
857    pub fn file_schema(&self) -> &SchemaRef {
858        self.file_source.table_schema().file_schema()
859    }
860
861    /// Get the table partition columns
862    pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
863        self.file_source.table_schema().table_partition_cols()
864    }
865
866    /// Returns the unprojected table statistics, marking them as inexact if filters are present.
867    ///
868    /// When filters are pushed down (including pruning predicates and bloom filters),
869    /// we can't guarantee the statistics are exact because we don't know how many
870    /// rows will be filtered out.
871    pub fn statistics(&self) -> Statistics {
872        if self.file_source.filter().is_some() {
873            self.statistics.clone().to_inexact()
874        } else {
875            self.statistics.clone()
876        }
877    }
878
879    pub fn projected_schema(&self) -> Result<Arc<Schema>> {
880        let schema = self.file_source.table_schema().table_schema();
881        match self.file_source.projection() {
882            Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)),
883            None => Ok(Arc::clone(schema)),
884        }
885    }
886
887    fn add_filter_equivalence_info(
888        filter: &Arc<dyn PhysicalExpr>,
889        eq_properties: &mut EquivalenceProperties,
890        schema: &Schema,
891    ) -> Result<()> {
892        // Gather valid equality pairs from the filter expression
893        let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
894            // Ignore any binary expressions that reference non-existent columns in the current schema
895            // (e.g. due to unnecessary projections being removed)
896            reassign_expr_columns(Arc::clone(expr), schema)
897                .ok()
898                .and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
899                    Some(expr) if expr.op() == &Operator::Eq => {
900                        Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
901                    }
902                    _ => None,
903                })
904        });
905
906        for (lhs, rhs) in equal_pairs {
907            eq_properties.add_equal_conditions(lhs, rhs)?
908        }
909
910        Ok(())
911    }
912
913    /// Returns whether newlines in values are supported.
914    ///
915    /// This method always returns `false`. The actual newlines_in_values setting
916    /// has been moved to [`CsvSource`] and should be accessed via
917    /// [`CsvSource::csv_options()`] instead.
918    ///
919    /// [`CsvSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html
920    /// [`CsvSource::csv_options()`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html#method.csv_options
921    #[deprecated(
922        since = "52.0.0",
923        note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
924    )]
925    pub fn newlines_in_values(&self) -> bool {
926        false
927    }
928
929    #[deprecated(
930        since = "52.0.0",
931        note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
932    )]
933    pub fn projected_constraints(&self) -> Constraints {
934        let props = self.eq_properties();
935        props.constraints().clone()
936    }
937
938    #[deprecated(
939        since = "52.0.0",
940        note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
941    )]
942    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
943        #[expect(deprecated)]
944        self.file_source.projection().as_ref().map(|p| {
945            p.ordered_column_indices()
946                .into_iter()
947                .filter(|&i| i < self.file_schema().fields().len())
948                .collect::<Vec<_>>()
949        })
950    }
951
952    /// Splits file groups into new groups based on statistics to enable efficient parallel processing.
953    ///
954    /// The method distributes files across a target number of partitions while ensuring
955    /// files within each partition maintain sort order based on their min/max statistics.
956    ///
957    /// The algorithm works by:
958    /// 1. Takes files sorted by minimum values
959    /// 2. For each file:
960    ///   - Finds eligible groups (empty or where file's min > group's last max)
961    ///   - Selects the smallest eligible group
962    ///   - Creates a new group if needed
963    ///
964    /// # Parameters
965    /// * `table_schema`: Schema containing information about the columns
966    /// * `file_groups`: The original file groups to split
967    /// * `sort_order`: The lexicographical ordering to maintain within each group
968    /// * `target_partitions`: The desired number of output partitions
969    ///
970    /// # Returns
971    /// A new set of file groups, where files within each group are non-overlapping with respect to
972    /// their min/max statistics and maintain the specified sort order.
973    pub fn split_groups_by_statistics_with_target_partitions(
974        table_schema: &SchemaRef,
975        file_groups: &[FileGroup],
976        sort_order: &LexOrdering,
977        target_partitions: usize,
978    ) -> Result<Vec<FileGroup>> {
979        if target_partitions == 0 {
980            return Err(internal_datafusion_err!(
981                "target_partitions must be greater than 0"
982            ));
983        }
984
985        let flattened_files = file_groups
986            .iter()
987            .flat_map(FileGroup::iter)
988            .collect::<Vec<_>>();
989
990        if flattened_files.is_empty() {
991            return Ok(vec![]);
992        }
993
994        let statistics = MinMaxStatistics::new_from_files(
995            sort_order,
996            table_schema,
997            None,
998            flattened_files.iter().copied(),
999        )?;
1000
1001        let indices_sorted_by_min = statistics.min_values_sorted();
1002
1003        // Initialize with target_partitions empty groups
1004        let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
1005
1006        for (idx, min) in indices_sorted_by_min {
1007            if let Some((_, group)) = file_groups_indices
1008                .iter_mut()
1009                .enumerate()
1010                .filter(|(_, group)| {
1011                    group.is_empty()
1012                        || min
1013                            > statistics
1014                                .max(*group.last().expect("groups should not be empty"))
1015                })
1016                .min_by_key(|(_, group)| group.len())
1017            {
1018                group.push(idx);
1019            } else {
1020                // Create a new group if no existing group fits
1021                file_groups_indices.push(vec![idx]);
1022            }
1023        }
1024
1025        // Remove any empty groups
1026        file_groups_indices.retain(|group| !group.is_empty());
1027
1028        // Assemble indices back into groups of PartitionedFiles
1029        Ok(file_groups_indices
1030            .into_iter()
1031            .map(|file_group_indices| {
1032                FileGroup::new(
1033                    file_group_indices
1034                        .into_iter()
1035                        .map(|idx| flattened_files[idx].clone())
1036                        .collect(),
1037                )
1038            })
1039            .collect())
1040    }
1041
1042    /// Attempts to do a bin-packing on files into file groups, such that any two files
1043    /// in a file group are ordered and non-overlapping with respect to their statistics.
1044    /// It will produce the smallest number of file groups possible.
1045    pub fn split_groups_by_statistics(
1046        table_schema: &SchemaRef,
1047        file_groups: &[FileGroup],
1048        sort_order: &LexOrdering,
1049    ) -> Result<Vec<FileGroup>> {
1050        let flattened_files = file_groups
1051            .iter()
1052            .flat_map(FileGroup::iter)
1053            .collect::<Vec<_>>();
1054        // First Fit:
1055        // * Choose the first file group that a file can be placed into.
1056        // * If it fits into no existing file groups, create a new one.
1057        //
1058        // By sorting files by min values and then applying first-fit bin packing,
1059        // we can produce the smallest number of file groups such that
1060        // files within a group are in order and non-overlapping.
1061        //
1062        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
1063        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
1064
1065        if flattened_files.is_empty() {
1066            return Ok(vec![]);
1067        }
1068
1069        let statistics = MinMaxStatistics::new_from_files(
1070            sort_order,
1071            table_schema,
1072            None,
1073            flattened_files.iter().copied(),
1074        )
1075        .map_err(|e| {
1076            e.context("construct min/max statistics for split_groups_by_statistics")
1077        })?;
1078
1079        let indices_sorted_by_min = statistics.min_values_sorted();
1080        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1081
1082        for (idx, min) in indices_sorted_by_min {
1083            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1084                // If our file is non-overlapping and comes _after_ the last file,
1085                // it fits in this file group.
1086                min > statistics.max(
1087                    *group
1088                        .last()
1089                        .expect("groups should be nonempty at construction"),
1090                )
1091            });
1092            match file_group_to_insert {
1093                Some(group) => group.push(idx),
1094                None => file_groups_indices.push(vec![idx]),
1095            }
1096        }
1097
1098        // Assemble indices back into groups of PartitionedFiles
1099        Ok(file_groups_indices
1100            .into_iter()
1101            .map(|file_group_indices| {
1102                file_group_indices
1103                    .into_iter()
1104                    .map(|idx| flattened_files[idx].clone())
1105                    .collect()
1106            })
1107            .collect())
1108    }
1109
1110    /// Write the data_type based on file_source
1111    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1112        write!(f, ", file_type={}", self.file_source.file_type())?;
1113        self.file_source.fmt_extra(t, f)
1114    }
1115
1116    /// Returns the file_source
1117    pub fn file_source(&self) -> &Arc<dyn FileSource> {
1118        &self.file_source
1119    }
1120
1121    /// Helper: Rebuild FileScanConfig with new file source
1122    fn rebuild_with_source(
1123        &self,
1124        new_file_source: Arc<dyn FileSource>,
1125        is_exact: bool,
1126    ) -> Result<Arc<dyn DataSource>> {
1127        let mut new_config = self.clone();
1128
1129        // Reverse file groups (FileScanConfig's responsibility)
1130        new_config.file_groups = new_config
1131            .file_groups
1132            .into_iter()
1133            .map(|group| {
1134                let mut files = group.into_inner();
1135                files.reverse();
1136                files.into()
1137            })
1138            .collect();
1139
1140        new_config.file_source = new_file_source;
1141
1142        // Phase 1: Clear output_ordering for Inexact
1143        // (we're only reversing row groups, not guaranteeing perfect ordering)
1144        if !is_exact {
1145            new_config.output_ordering = vec![];
1146        }
1147
1148        Ok(Arc::new(new_config))
1149    }
1150}
1151
1152impl Debug for FileScanConfig {
1153    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1154        write!(f, "FileScanConfig {{")?;
1155        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1156
1157        write!(f, "statistics={:?}, ", self.statistics())?;
1158
1159        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1160        write!(f, "}}")
1161    }
1162}
1163
1164impl DisplayAs for FileScanConfig {
1165    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1166        let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
1167        let orderings = get_projected_output_ordering(self, &schema);
1168
1169        write!(f, "file_groups=")?;
1170        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1171
1172        if !schema.fields().is_empty() {
1173            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1174        }
1175
1176        if let Some(limit) = self.limit {
1177            write!(f, ", limit={limit}")?;
1178        }
1179
1180        display_orderings(f, &orderings)?;
1181
1182        if !self.constraints.is_empty() {
1183            write!(f, ", {}", self.constraints)?;
1184        }
1185
1186        Ok(())
1187    }
1188}
1189
1190/// Get the indices of columns in a projection if the projection is a simple
1191/// list of columns.
1192/// If there are any expressions other than columns, returns None.
1193fn ordered_column_indices_from_projection(
1194    projection: &ProjectionExprs,
1195) -> Option<Vec<usize>> {
1196    projection
1197        .expr_iter()
1198        .map(|e| {
1199            let index = e.as_any().downcast_ref::<Column>()?.index();
1200            Some(index)
1201        })
1202        .collect::<Option<Vec<usize>>>()
1203}
1204
1205/// The various listing tables does not attempt to read all files
1206/// concurrently, instead they will read files in sequence within a
1207/// partition.  This is an important property as it allows plans to
1208/// run against 1000s of files and not try to open them all
1209/// concurrently.
1210///
1211/// However, it means if we assign more than one file to a partition
1212/// the output sort order will not be preserved as illustrated in the
1213/// following diagrams:
1214///
1215/// When only 1 file is assigned to each partition, each partition is
1216/// correctly sorted on `(A, B, C)`
1217///
1218/// ```text
1219/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
1220///   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
1221/// ┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
1222///   │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
1223/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
1224///   │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
1225/// ┃                                          │                    │                     ┃
1226///   │                   │ │                    │                    │                 │
1227/// ┃                                          │                    │                     ┃
1228///   │                   │ │                    │                    │                 │
1229/// ┃                                          │                    │                     ┃
1230///   │                   │ │                    │                    │                 │
1231/// ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1232///      DataFusion           DataFusion           DataFusion           DataFusion
1233/// ┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
1234///  ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1235///
1236///                                      DataSourceExec
1237/// ```
1238///
1239/// However, when more than 1 file is assigned to each partition, each
1240/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
1241/// file is scanned, the same values for A, B and C can be repeated in
1242/// the same sorted stream
1243///
1244///```text
1245/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1246///   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1247/// ┃   ┌───────────────┐     ┌──────────────┐ │
1248///   │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
1249/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1250///   │ └───────────────┘ │ │ └──────────────┘   ┃
1251/// ┃   ┌───────────────┐     ┌──────────────┐ │
1252///   │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
1253/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1254///   │ └───────────────┘ │ │ └──────────────┘   ┃
1255/// ┃                                          │
1256///   │                   │ │                    ┃
1257/// ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
1258///      DataFusion           DataFusion         ┃
1259/// ┃    Partition 1          Partition 2
1260///  ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
1261///
1262///              DataSourceExec
1263/// ```
1264fn get_projected_output_ordering(
1265    base_config: &FileScanConfig,
1266    projected_schema: &SchemaRef,
1267) -> Vec<LexOrdering> {
1268    let projected_orderings =
1269        project_orderings(&base_config.output_ordering, projected_schema);
1270
1271    let mut all_orderings = vec![];
1272    for new_ordering in projected_orderings {
1273        // Check if any file groups are not sorted
1274        if base_config.file_groups.iter().any(|group| {
1275            if group.len() <= 1 {
1276                // File groups with <= 1 files are always sorted
1277                return false;
1278            }
1279
1280            let Some(indices) = base_config
1281                .file_source
1282                .projection()
1283                .as_ref()
1284                .map(|p| ordered_column_indices_from_projection(p))
1285            else {
1286                // Can't determine if ordered without a simple projection
1287                return true;
1288            };
1289
1290            let statistics = match MinMaxStatistics::new_from_files(
1291                &new_ordering,
1292                projected_schema,
1293                indices.as_deref(),
1294                group.iter(),
1295            ) {
1296                Ok(statistics) => statistics,
1297                Err(e) => {
1298                    log::trace!("Error fetching statistics for file group: {e}");
1299                    // we can't prove that it's ordered, so we have to reject it
1300                    return true;
1301                }
1302            };
1303
1304            !statistics.is_sorted()
1305        }) {
1306            debug!(
1307                "Skipping specified output ordering {:?}. \
1308                Some file groups couldn't be determined to be sorted: {:?}",
1309                base_config.output_ordering[0], base_config.file_groups
1310            );
1311            continue;
1312        }
1313
1314        all_orderings.push(new_ordering);
1315    }
1316    all_orderings
1317}
1318
1319/// Convert type to a type suitable for use as a `ListingTable`
1320/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1321/// a reasonable trade off between a reasonable number of partition
1322/// values and space efficiency.
1323///
1324/// This use this to specify types for partition columns. However
1325/// you MAY also choose not to dictionary-encode the data or to use a
1326/// different dictionary type.
1327///
1328/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1329pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1330    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1331}
1332
1333/// Convert a [`ScalarValue`] of partition columns to a type, as
1334/// described in the documentation of [`wrap_partition_type_in_dict`],
1335/// which can wrap the types.
1336pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1337    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342    use std::collections::HashMap;
1343
1344    use super::*;
1345    use crate::TableSchema;
1346    use crate::test_util::col;
1347    use crate::{
1348        generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1349        verify_sort_integrity,
1350    };
1351
1352    use arrow::datatypes::Field;
1353    use datafusion_common::stats::Precision;
1354    use datafusion_common::{ColumnStatistics, internal_err};
1355    use datafusion_expr::{Operator, SortExpr};
1356    use datafusion_physical_expr::create_physical_sort_expr;
1357    use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
1358    use datafusion_physical_expr::projection::ProjectionExpr;
1359    use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1360
1361    #[test]
1362    fn physical_plan_config_no_projection_tab_cols_as_field() {
1363        let file_schema = aggr_test_schema();
1364
1365        // make a table_partition_col as a field
1366        let table_partition_col =
1367            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1368                .with_metadata(HashMap::from_iter(vec![(
1369                    "key_whatever".to_owned(),
1370                    "value_whatever".to_owned(),
1371                )]));
1372
1373        let conf = config_for_projection(
1374            Arc::clone(&file_schema),
1375            None,
1376            Statistics::new_unknown(&file_schema),
1377            vec![table_partition_col.clone()],
1378        );
1379
1380        // verify the proj_schema includes the last column and exactly the same the field it is defined
1381        let proj_schema = conf.projected_schema().unwrap();
1382        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1383        assert_eq!(
1384            *proj_schema.field(file_schema.fields().len()),
1385            table_partition_col,
1386            "partition columns are the last columns and ust have all values defined in created field"
1387        );
1388    }
1389
1390    #[test]
1391    fn test_split_groups_by_statistics() -> Result<()> {
1392        use chrono::TimeZone;
1393        use datafusion_common::DFSchema;
1394        use datafusion_expr::execution_props::ExecutionProps;
1395        use object_store::{ObjectMeta, path::Path};
1396
1397        struct File {
1398            name: &'static str,
1399            date: &'static str,
1400            statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1401        }
1402        impl File {
1403            fn new(
1404                name: &'static str,
1405                date: &'static str,
1406                statistics: Vec<Option<(f64, f64)>>,
1407            ) -> Self {
1408                Self::new_nullable(
1409                    name,
1410                    date,
1411                    statistics
1412                        .into_iter()
1413                        .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1414                        .collect(),
1415                )
1416            }
1417
1418            fn new_nullable(
1419                name: &'static str,
1420                date: &'static str,
1421                statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1422            ) -> Self {
1423                Self {
1424                    name,
1425                    date,
1426                    statistics,
1427                }
1428            }
1429        }
1430
1431        struct TestCase {
1432            name: &'static str,
1433            file_schema: Schema,
1434            files: Vec<File>,
1435            sort: Vec<SortExpr>,
1436            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1437        }
1438
1439        use datafusion_expr::col;
1440        let cases = vec![
1441            TestCase {
1442                name: "test sort",
1443                file_schema: Schema::new(vec![Field::new(
1444                    "value".to_string(),
1445                    DataType::Float64,
1446                    false,
1447                )]),
1448                files: vec![
1449                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1450                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1451                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1452                ],
1453                sort: vec![col("value").sort(true, false)],
1454                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1455            },
1456            // same input but file '2' is in the middle
1457            // test that we still order correctly
1458            TestCase {
1459                name: "test sort with files ordered differently",
1460                file_schema: Schema::new(vec![Field::new(
1461                    "value".to_string(),
1462                    DataType::Float64,
1463                    false,
1464                )]),
1465                files: vec![
1466                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1467                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1468                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1469                ],
1470                sort: vec![col("value").sort(true, false)],
1471                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1472            },
1473            TestCase {
1474                name: "reverse sort",
1475                file_schema: Schema::new(vec![Field::new(
1476                    "value".to_string(),
1477                    DataType::Float64,
1478                    false,
1479                )]),
1480                files: vec![
1481                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1482                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1483                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1484                ],
1485                sort: vec![col("value").sort(false, true)],
1486                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1487            },
1488            TestCase {
1489                name: "nullable sort columns, nulls last",
1490                file_schema: Schema::new(vec![Field::new(
1491                    "value".to_string(),
1492                    DataType::Float64,
1493                    true,
1494                )]),
1495                files: vec![
1496                    File::new_nullable(
1497                        "0",
1498                        "2023-01-01",
1499                        vec![Some((Some(0.00), Some(0.49)))],
1500                    ),
1501                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1502                    File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1503                ],
1504                sort: vec![col("value").sort(true, false)],
1505                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1506            },
1507            TestCase {
1508                name: "nullable sort columns, nulls first",
1509                file_schema: Schema::new(vec![Field::new(
1510                    "value".to_string(),
1511                    DataType::Float64,
1512                    true,
1513                )]),
1514                files: vec![
1515                    File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1516                    File::new_nullable(
1517                        "1",
1518                        "2023-01-01",
1519                        vec![Some((Some(0.50), Some(1.00)))],
1520                    ),
1521                    File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1522                ],
1523                sort: vec![col("value").sort(true, true)],
1524                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1525            },
1526            TestCase {
1527                name: "all three non-overlapping",
1528                file_schema: Schema::new(vec![Field::new(
1529                    "value".to_string(),
1530                    DataType::Float64,
1531                    false,
1532                )]),
1533                files: vec![
1534                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1535                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1536                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1537                ],
1538                sort: vec![col("value").sort(true, false)],
1539                expected_result: Ok(vec![vec!["0", "1", "2"]]),
1540            },
1541            TestCase {
1542                name: "all three overlapping",
1543                file_schema: Schema::new(vec![Field::new(
1544                    "value".to_string(),
1545                    DataType::Float64,
1546                    false,
1547                )]),
1548                files: vec![
1549                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1550                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1551                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1552                ],
1553                sort: vec![col("value").sort(true, false)],
1554                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1555            },
1556            TestCase {
1557                name: "empty input",
1558                file_schema: Schema::new(vec![Field::new(
1559                    "value".to_string(),
1560                    DataType::Float64,
1561                    false,
1562                )]),
1563                files: vec![],
1564                sort: vec![col("value").sort(true, false)],
1565                expected_result: Ok(vec![]),
1566            },
1567            TestCase {
1568                name: "one file missing statistics",
1569                file_schema: Schema::new(vec![Field::new(
1570                    "value".to_string(),
1571                    DataType::Float64,
1572                    false,
1573                )]),
1574                files: vec![
1575                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1576                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1577                    File::new("2", "2023-01-02", vec![None]),
1578                ],
1579                sort: vec![col("value").sort(true, false)],
1580                expected_result: Err(
1581                    "construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
1582                ),
1583            },
1584        ];
1585
1586        for case in cases {
1587            let table_schema = Arc::new(Schema::new(
1588                case.file_schema
1589                    .fields()
1590                    .clone()
1591                    .into_iter()
1592                    .cloned()
1593                    .chain(Some(Arc::new(Field::new(
1594                        "date".to_string(),
1595                        DataType::Utf8,
1596                        false,
1597                    ))))
1598                    .collect::<Vec<_>>(),
1599            ));
1600            let Some(sort_order) = LexOrdering::new(
1601                case.sort
1602                    .into_iter()
1603                    .map(|expr| {
1604                        create_physical_sort_expr(
1605                            &expr,
1606                            &DFSchema::try_from(Arc::clone(&table_schema))?,
1607                            &ExecutionProps::default(),
1608                        )
1609                    })
1610                    .collect::<Result<Vec<_>>>()?,
1611            ) else {
1612                return internal_err!("This test should always use an ordering");
1613            };
1614
1615            let partitioned_files = FileGroup::new(
1616                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
1617            );
1618            let result = FileScanConfig::split_groups_by_statistics(
1619                &table_schema,
1620                std::slice::from_ref(&partitioned_files),
1621                &sort_order,
1622            );
1623            let results_by_name = result
1624                .as_ref()
1625                .map(|file_groups| {
1626                    file_groups
1627                        .iter()
1628                        .map(|file_group| {
1629                            file_group
1630                                .iter()
1631                                .map(|file| {
1632                                    partitioned_files
1633                                        .iter()
1634                                        .find_map(|f| {
1635                                            if f.object_meta == file.object_meta {
1636                                                Some(
1637                                                    f.object_meta
1638                                                        .location
1639                                                        .as_ref()
1640                                                        .rsplit('/')
1641                                                        .next()
1642                                                        .unwrap()
1643                                                        .trim_end_matches(".parquet"),
1644                                                )
1645                                            } else {
1646                                                None
1647                                            }
1648                                        })
1649                                        .unwrap()
1650                                })
1651                                .collect::<Vec<_>>()
1652                        })
1653                        .collect::<Vec<_>>()
1654                })
1655                .map_err(|e| e.strip_backtrace().leak() as &'static str);
1656
1657            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1658        }
1659
1660        return Ok(());
1661
1662        impl From<File> for PartitionedFile {
1663            fn from(file: File) -> Self {
1664                PartitionedFile {
1665                    object_meta: ObjectMeta {
1666                        location: Path::from(format!(
1667                            "data/date={}/{}.parquet",
1668                            file.date, file.name
1669                        )),
1670                        last_modified: chrono::Utc.timestamp_nanos(0),
1671                        size: 0,
1672                        e_tag: None,
1673                        version: None,
1674                    },
1675                    partition_values: vec![ScalarValue::from(file.date)],
1676                    range: None,
1677                    statistics: Some(Arc::new(Statistics {
1678                        num_rows: Precision::Absent,
1679                        total_byte_size: Precision::Absent,
1680                        column_statistics: file
1681                            .statistics
1682                            .into_iter()
1683                            .map(|stats| {
1684                                stats
1685                                    .map(|(min, max)| ColumnStatistics {
1686                                        min_value: Precision::Exact(
1687                                            ScalarValue::Float64(min),
1688                                        ),
1689                                        max_value: Precision::Exact(
1690                                            ScalarValue::Float64(max),
1691                                        ),
1692                                        ..Default::default()
1693                                    })
1694                                    .unwrap_or_default()
1695                            })
1696                            .collect::<Vec<_>>(),
1697                    })),
1698                    extensions: None,
1699                    metadata_size_hint: None,
1700                }
1701            }
1702        }
1703    }
1704
1705    // sets default for configs that play no role in projections
1706    fn config_for_projection(
1707        file_schema: SchemaRef,
1708        projection: Option<Vec<usize>>,
1709        statistics: Statistics,
1710        table_partition_cols: Vec<Field>,
1711    ) -> FileScanConfig {
1712        let table_schema = TableSchema::new(
1713            file_schema,
1714            table_partition_cols.into_iter().map(Arc::new).collect(),
1715        );
1716        FileScanConfigBuilder::new(
1717            ObjectStoreUrl::parse("test:///").unwrap(),
1718            Arc::new(MockSource::new(table_schema.clone())),
1719        )
1720        .with_projection_indices(projection)
1721        .unwrap()
1722        .with_statistics(statistics)
1723        .build()
1724    }
1725
1726    #[test]
1727    fn test_file_scan_config_builder() {
1728        let file_schema = aggr_test_schema();
1729        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1730
1731        let table_schema = TableSchema::new(
1732            Arc::clone(&file_schema),
1733            vec![Arc::new(Field::new(
1734                "date",
1735                wrap_partition_type_in_dict(DataType::Utf8),
1736                false,
1737            ))],
1738        );
1739
1740        let file_source: Arc<dyn FileSource> =
1741            Arc::new(MockSource::new(table_schema.clone()));
1742
1743        // Create a builder with required parameters
1744        let builder = FileScanConfigBuilder::new(
1745            object_store_url.clone(),
1746            Arc::clone(&file_source),
1747        );
1748
1749        // Build with various configurations
1750        let config = builder
1751            .with_limit(Some(1000))
1752            .with_projection_indices(Some(vec![0, 1]))
1753            .unwrap()
1754            .with_statistics(Statistics::new_unknown(&file_schema))
1755            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
1756                "test.parquet".to_string(),
1757                1024,
1758            )])])
1759            .with_output_ordering(vec![
1760                [PhysicalSortExpr::new_default(Arc::new(Column::new(
1761                    "date", 0,
1762                )))]
1763                .into(),
1764            ])
1765            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1766            .build();
1767
1768        // Verify the built config has all the expected values
1769        assert_eq!(config.object_store_url, object_store_url);
1770        assert_eq!(*config.file_schema(), file_schema);
1771        assert_eq!(config.limit, Some(1000));
1772        assert_eq!(
1773            config
1774                .file_source
1775                .projection()
1776                .as_ref()
1777                .map(|p| p.column_indices()),
1778            Some(vec![0, 1])
1779        );
1780        assert_eq!(config.table_partition_cols().len(), 1);
1781        assert_eq!(config.table_partition_cols()[0].name(), "date");
1782        assert_eq!(config.file_groups.len(), 1);
1783        assert_eq!(config.file_groups[0].len(), 1);
1784        assert_eq!(
1785            config.file_groups[0][0].object_meta.location.as_ref(),
1786            "test.parquet"
1787        );
1788        assert_eq!(
1789            config.file_compression_type,
1790            FileCompressionType::UNCOMPRESSED
1791        );
1792        assert_eq!(config.output_ordering.len(), 1);
1793    }
1794
1795    #[test]
1796    fn equivalence_properties_after_schema_change() {
1797        let file_schema = aggr_test_schema();
1798        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1799
1800        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1801
1802        // Create a file source with a filter
1803        let file_source: Arc<dyn FileSource> = Arc::new(
1804            MockSource::new(table_schema.clone()).with_filter(Arc::new(BinaryExpr::new(
1805                col("c2", &file_schema).unwrap(),
1806                Operator::Eq,
1807                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1808            ))),
1809        );
1810
1811        let config = FileScanConfigBuilder::new(
1812            object_store_url.clone(),
1813            Arc::clone(&file_source),
1814        )
1815        .with_projection_indices(Some(vec![0, 1, 2]))
1816        .unwrap()
1817        .build();
1818
1819        // Simulate projection being updated. Since the filter has already been pushed down,
1820        // the new projection won't include the filtered column.
1821        let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
1822            col("c1", &file_schema).unwrap(),
1823            "c1",
1824        )]);
1825        let data_source = config
1826            .try_swapping_with_projection(&exprs)
1827            .unwrap()
1828            .unwrap();
1829
1830        // Gather the equivalence properties from the new data source. There should
1831        // be no equivalence class for column c2 since it was removed by the projection.
1832        let eq_properties = data_source.eq_properties();
1833        let eq_group = eq_properties.eq_group();
1834
1835        for class in eq_group.iter() {
1836            for expr in class.iter() {
1837                if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1838                    assert_ne!(
1839                        col.name(),
1840                        "c2",
1841                        "c2 should not be present in any equivalence class"
1842                    );
1843                }
1844            }
1845        }
1846    }
1847
1848    #[test]
1849    fn test_file_scan_config_builder_defaults() {
1850        let file_schema = aggr_test_schema();
1851        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1852
1853        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1854
1855        let file_source: Arc<dyn FileSource> =
1856            Arc::new(MockSource::new(table_schema.clone()));
1857
1858        // Create a builder with only required parameters and build without any additional configurations
1859        let config = FileScanConfigBuilder::new(
1860            object_store_url.clone(),
1861            Arc::clone(&file_source),
1862        )
1863        .build();
1864
1865        // Verify default values
1866        assert_eq!(config.object_store_url, object_store_url);
1867        assert_eq!(*config.file_schema(), file_schema);
1868        assert_eq!(config.limit, None);
1869        // When no projection is specified, the file source should have an unprojected projection
1870        // (i.e., all columns)
1871        let expected_projection: Vec<usize> = (0..file_schema.fields().len()).collect();
1872        assert_eq!(
1873            config
1874                .file_source
1875                .projection()
1876                .as_ref()
1877                .map(|p| p.column_indices()),
1878            Some(expected_projection)
1879        );
1880        assert!(config.table_partition_cols().is_empty());
1881        assert!(config.file_groups.is_empty());
1882        assert_eq!(
1883            config.file_compression_type,
1884            FileCompressionType::UNCOMPRESSED
1885        );
1886        assert!(config.output_ordering.is_empty());
1887        assert!(config.constraints.is_empty());
1888
1889        // Verify statistics are set to unknown
1890        assert_eq!(config.statistics().num_rows, Precision::Absent);
1891        assert_eq!(config.statistics().total_byte_size, Precision::Absent);
1892        assert_eq!(
1893            config.statistics().column_statistics.len(),
1894            file_schema.fields().len()
1895        );
1896        for stat in config.statistics().column_statistics {
1897            assert_eq!(stat.distinct_count, Precision::Absent);
1898            assert_eq!(stat.min_value, Precision::Absent);
1899            assert_eq!(stat.max_value, Precision::Absent);
1900            assert_eq!(stat.null_count, Precision::Absent);
1901        }
1902    }
1903
1904    #[test]
1905    fn test_file_scan_config_builder_new_from() {
1906        let schema = aggr_test_schema();
1907        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1908        let partition_cols = vec![Field::new(
1909            "date",
1910            wrap_partition_type_in_dict(DataType::Utf8),
1911            false,
1912        )];
1913        let file = PartitionedFile::new("test_file.parquet", 100);
1914
1915        let table_schema = TableSchema::new(
1916            Arc::clone(&schema),
1917            partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
1918        );
1919
1920        let file_source: Arc<dyn FileSource> =
1921            Arc::new(MockSource::new(table_schema.clone()));
1922
1923        // Create a config with non-default values
1924        let original_config = FileScanConfigBuilder::new(
1925            object_store_url.clone(),
1926            Arc::clone(&file_source),
1927        )
1928        .with_projection_indices(Some(vec![0, 2]))
1929        .unwrap()
1930        .with_limit(Some(10))
1931        .with_file(file.clone())
1932        .with_constraints(Constraints::default())
1933        .build();
1934
1935        // Create a new builder from the config
1936        let new_builder = FileScanConfigBuilder::from(original_config);
1937
1938        // Build a new config from this builder
1939        let new_config = new_builder.build();
1940
1941        // Verify properties match
1942        let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
1943        assert_eq!(new_config.object_store_url, object_store_url);
1944        assert_eq!(*new_config.file_schema(), schema);
1945        assert_eq!(
1946            new_config
1947                .file_source
1948                .projection()
1949                .as_ref()
1950                .map(|p| p.column_indices()),
1951            Some(vec![0, 2])
1952        );
1953        assert_eq!(new_config.limit, Some(10));
1954        assert_eq!(*new_config.table_partition_cols(), partition_cols);
1955        assert_eq!(new_config.file_groups.len(), 1);
1956        assert_eq!(new_config.file_groups[0].len(), 1);
1957        assert_eq!(
1958            new_config.file_groups[0][0].object_meta.location.as_ref(),
1959            "test_file.parquet"
1960        );
1961        assert_eq!(new_config.constraints, Constraints::default());
1962    }
1963
1964    #[test]
1965    fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
1966        use datafusion_common::DFSchema;
1967        use datafusion_expr::{col, execution_props::ExecutionProps};
1968
1969        let schema = Arc::new(Schema::new(vec![Field::new(
1970            "value",
1971            DataType::Float64,
1972            false,
1973        )]));
1974
1975        // Setup sort expression
1976        let exec_props = ExecutionProps::new();
1977        let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
1978        let sort_expr = [col("value").sort(true, false)];
1979        let sort_ordering = sort_expr
1980            .map(|expr| {
1981                create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
1982            })
1983            .into();
1984
1985        // Test case parameters
1986        struct TestCase {
1987            name: String,
1988            file_count: usize,
1989            overlap_factor: f64,
1990            target_partitions: usize,
1991            expected_partition_count: usize,
1992        }
1993
1994        let test_cases = vec![
1995            // Basic cases
1996            TestCase {
1997                name: "no_overlap_10_files_4_partitions".to_string(),
1998                file_count: 10,
1999                overlap_factor: 0.0,
2000                target_partitions: 4,
2001                expected_partition_count: 4,
2002            },
2003            TestCase {
2004                name: "medium_overlap_20_files_5_partitions".to_string(),
2005                file_count: 20,
2006                overlap_factor: 0.5,
2007                target_partitions: 5,
2008                expected_partition_count: 5,
2009            },
2010            TestCase {
2011                name: "high_overlap_30_files_3_partitions".to_string(),
2012                file_count: 30,
2013                overlap_factor: 0.8,
2014                target_partitions: 3,
2015                expected_partition_count: 7,
2016            },
2017            // Edge cases
2018            TestCase {
2019                name: "fewer_files_than_partitions".to_string(),
2020                file_count: 3,
2021                overlap_factor: 0.0,
2022                target_partitions: 10,
2023                expected_partition_count: 3, // Should only create as many partitions as files
2024            },
2025            TestCase {
2026                name: "single_file".to_string(),
2027                file_count: 1,
2028                overlap_factor: 0.0,
2029                target_partitions: 5,
2030                expected_partition_count: 1, // Should create only one partition
2031            },
2032            TestCase {
2033                name: "empty_files".to_string(),
2034                file_count: 0,
2035                overlap_factor: 0.0,
2036                target_partitions: 3,
2037                expected_partition_count: 0, // Empty result for empty input
2038            },
2039        ];
2040
2041        for case in test_cases {
2042            println!("Running test case: {}", case.name);
2043
2044            // Generate files using bench utility function
2045            let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2046
2047            // Call the function under test
2048            let result =
2049                FileScanConfig::split_groups_by_statistics_with_target_partitions(
2050                    &schema,
2051                    &file_groups,
2052                    &sort_ordering,
2053                    case.target_partitions,
2054                )?;
2055
2056            // Verify results
2057            println!(
2058                "Created {} partitions (target was {})",
2059                result.len(),
2060                case.target_partitions
2061            );
2062
2063            // Check partition count
2064            assert_eq!(
2065                result.len(),
2066                case.expected_partition_count,
2067                "Case '{}': Unexpected partition count",
2068                case.name
2069            );
2070
2071            // Verify sort integrity
2072            assert!(
2073                verify_sort_integrity(&result),
2074                "Case '{}': Files within partitions are not properly ordered",
2075                case.name
2076            );
2077
2078            // Distribution check for partitions
2079            if case.file_count > 1 && case.expected_partition_count > 1 {
2080                let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2081                let max_size = *group_sizes.iter().max().unwrap();
2082                let min_size = *group_sizes.iter().min().unwrap();
2083
2084                // Check partition balancing - difference shouldn't be extreme
2085                let avg_files_per_partition =
2086                    case.file_count as f64 / case.expected_partition_count as f64;
2087                assert!(
2088                    (max_size as f64) < 2.0 * avg_files_per_partition,
2089                    "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2090                    case.name,
2091                    max_size,
2092                    avg_files_per_partition
2093                );
2094
2095                println!("Distribution - min files: {min_size}, max files: {max_size}");
2096            }
2097        }
2098
2099        // Test error case: zero target partitions
2100        let empty_groups: Vec<FileGroup> = vec![];
2101        let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2102            &schema,
2103            &empty_groups,
2104            &sort_ordering,
2105            0,
2106        )
2107        .unwrap_err();
2108
2109        assert!(
2110            err.to_string()
2111                .contains("target_partitions must be greater than 0"),
2112            "Expected error for zero target partitions"
2113        );
2114
2115        Ok(())
2116    }
2117
2118    #[test]
2119    fn test_partition_statistics_projection() {
2120        // This test verifies that partition_statistics applies projection correctly.
2121        // The old implementation had a bug where it returned file group statistics
2122        // without applying the projection, returning all column statistics instead
2123        // of just the projected ones.
2124
2125        use crate::source::DataSourceExec;
2126        use datafusion_physical_plan::ExecutionPlan;
2127
2128        // Create a schema with 4 columns
2129        let schema = Arc::new(Schema::new(vec![
2130            Field::new("col0", DataType::Int32, false),
2131            Field::new("col1", DataType::Int32, false),
2132            Field::new("col2", DataType::Int32, false),
2133            Field::new("col3", DataType::Int32, false),
2134        ]));
2135
2136        // Create statistics for all 4 columns
2137        let file_group_stats = Statistics {
2138            num_rows: Precision::Exact(100),
2139            total_byte_size: Precision::Exact(1024),
2140            column_statistics: vec![
2141                ColumnStatistics {
2142                    null_count: Precision::Exact(0),
2143                    ..ColumnStatistics::new_unknown()
2144                },
2145                ColumnStatistics {
2146                    null_count: Precision::Exact(5),
2147                    ..ColumnStatistics::new_unknown()
2148                },
2149                ColumnStatistics {
2150                    null_count: Precision::Exact(10),
2151                    ..ColumnStatistics::new_unknown()
2152                },
2153                ColumnStatistics {
2154                    null_count: Precision::Exact(15),
2155                    ..ColumnStatistics::new_unknown()
2156                },
2157            ],
2158        };
2159
2160        // Create a file group with statistics
2161        let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2162            .with_statistics(Arc::new(file_group_stats));
2163
2164        let table_schema = TableSchema::new(Arc::clone(&schema), vec![]);
2165
2166        // Create a FileScanConfig with projection: only keep columns 0 and 2
2167        let config = FileScanConfigBuilder::new(
2168            ObjectStoreUrl::parse("test:///").unwrap(),
2169            Arc::new(MockSource::new(table_schema.clone())),
2170        )
2171        .with_projection_indices(Some(vec![0, 2]))
2172        .unwrap() // Only project columns 0 and 2
2173        .with_file_groups(vec![file_group])
2174        .build();
2175
2176        // Create a DataSourceExec from the config
2177        let exec = DataSourceExec::from_data_source(config);
2178
2179        // Get statistics for partition 0
2180        let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2181
2182        // Verify that only 2 columns are in the statistics (the projected ones)
2183        assert_eq!(
2184            partition_stats.column_statistics.len(),
2185            2,
2186            "Expected 2 column statistics (projected), but got {}",
2187            partition_stats.column_statistics.len()
2188        );
2189
2190        // Verify the column statistics are for columns 0 and 2
2191        assert_eq!(
2192            partition_stats.column_statistics[0].null_count,
2193            Precision::Exact(0),
2194            "First projected column should be col0 with 0 nulls"
2195        );
2196        assert_eq!(
2197            partition_stats.column_statistics[1].null_count,
2198            Precision::Exact(10),
2199            "Second projected column should be col2 with 10 nulls"
2200        );
2201
2202        // Verify row count and byte size
2203        assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2204        assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
2205    }
2206
2207    #[test]
2208    fn test_output_partitioning_not_partitioned_by_file_group() {
2209        let file_schema = aggr_test_schema();
2210        let partition_col =
2211            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), false);
2212
2213        let config = config_for_projection(
2214            Arc::clone(&file_schema),
2215            None,
2216            Statistics::new_unknown(&file_schema),
2217            vec![partition_col],
2218        );
2219
2220        // partitioned_by_file_group defaults to false
2221        let partitioning = config.output_partitioning();
2222        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2223    }
2224
2225    #[test]
2226    fn test_output_partitioning_no_partition_columns() {
2227        let file_schema = aggr_test_schema();
2228        let mut config = config_for_projection(
2229            Arc::clone(&file_schema),
2230            None,
2231            Statistics::new_unknown(&file_schema),
2232            vec![], // No partition columns
2233        );
2234        config.partitioned_by_file_group = true;
2235
2236        let partitioning = config.output_partitioning();
2237        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2238    }
2239
2240    #[test]
2241    fn test_output_partitioning_with_partition_columns() {
2242        let file_schema = aggr_test_schema();
2243
2244        // Test single partition column
2245        let single_partition_col = vec![Field::new(
2246            "date",
2247            wrap_partition_type_in_dict(DataType::Utf8),
2248            false,
2249        )];
2250
2251        let mut config = config_for_projection(
2252            Arc::clone(&file_schema),
2253            None,
2254            Statistics::new_unknown(&file_schema),
2255            single_partition_col,
2256        );
2257        config.partitioned_by_file_group = true;
2258        config.file_groups = vec![
2259            FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2260            FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2261            FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
2262        ];
2263
2264        let partitioning = config.output_partitioning();
2265        match partitioning {
2266            Partitioning::Hash(exprs, num_partitions) => {
2267                assert_eq!(num_partitions, 3);
2268                assert_eq!(exprs.len(), 1);
2269                assert_eq!(
2270                    exprs[0].as_any().downcast_ref::<Column>().unwrap().name(),
2271                    "date"
2272                );
2273            }
2274            _ => panic!("Expected Hash partitioning"),
2275        }
2276
2277        // Test multiple partition columns
2278        let multiple_partition_cols = vec![
2279            Field::new("year", wrap_partition_type_in_dict(DataType::Utf8), false),
2280            Field::new("month", wrap_partition_type_in_dict(DataType::Utf8), false),
2281        ];
2282
2283        config = config_for_projection(
2284            Arc::clone(&file_schema),
2285            None,
2286            Statistics::new_unknown(&file_schema),
2287            multiple_partition_cols,
2288        );
2289        config.partitioned_by_file_group = true;
2290        config.file_groups = vec![
2291            FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2292            FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2293        ];
2294
2295        let partitioning = config.output_partitioning();
2296        match partitioning {
2297            Partitioning::Hash(exprs, num_partitions) => {
2298                assert_eq!(num_partitions, 2);
2299                assert_eq!(exprs.len(), 2);
2300                let col_names: Vec<_> = exprs
2301                    .iter()
2302                    .map(|e| e.as_any().downcast_ref::<Column>().unwrap().name())
2303                    .collect();
2304                assert_eq!(col_names, vec!["year", "month"]);
2305            }
2306            _ => panic!("Expected Hash partitioning"),
2307        }
2308    }
2309}