datafusion_datasource/
file_scan_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21use std::{
22    any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23    fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use crate::file_groups::FileGroup;
27#[allow(unused_imports)]
28use crate::schema_adapter::SchemaAdapterFactory;
29use crate::{
30    display::FileGroupsDisplay, file::FileSource,
31    file_compression_type::FileCompressionType, file_stream::FileStream,
32    source::DataSource, statistics::MinMaxStatistics, PartitionedFile,
33};
34use arrow::datatypes::FieldRef;
35use arrow::{
36    array::{
37        ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
38        RecordBatchOptions,
39    },
40    buffer::Buffer,
41    datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
42};
43use datafusion_common::config::ConfigOptions;
44use datafusion_common::{
45    exec_err, ColumnStatistics, Constraints, DataFusionError, Result, ScalarValue,
46    Statistics,
47};
48use datafusion_execution::{
49    object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
50};
51use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_columns};
52use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
53use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
54use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
56use datafusion_physical_plan::projection::ProjectionExpr;
57use datafusion_physical_plan::{
58    display::{display_orderings, ProjectSchemaDisplay},
59    metrics::ExecutionPlanMetricsSet,
60    projection::{all_alias_free_columns, new_projections_for_columns},
61    DisplayAs, DisplayFormatType,
62};
63use datafusion_physical_plan::{
64    filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
65};
66
67use datafusion_physical_plan::coop::cooperative;
68use datafusion_physical_plan::execution_plan::SchedulingType;
69use log::{debug, warn};
70
71/// The base configurations for a [`DataSourceExec`], the a physical plan for
72/// any given file format.
73///
74/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
75///
76/// # Example
77/// ```
78/// # use std::any::Any;
79/// # use std::sync::Arc;
80/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
81/// # use object_store::ObjectStore;
82/// # use datafusion_common::Statistics;
83/// # use datafusion_common::Result;
84/// # use datafusion_datasource::file::FileSource;
85/// # use datafusion_datasource::file_groups::FileGroup;
86/// # use datafusion_datasource::PartitionedFile;
87/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
88/// # use datafusion_datasource::file_stream::FileOpener;
89/// # use datafusion_datasource::source::DataSourceExec;
90/// # use datafusion_execution::object_store::ObjectStoreUrl;
91/// # use datafusion_physical_plan::ExecutionPlan;
92/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
93/// # use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
94/// # let file_schema = Arc::new(Schema::new(vec![
95/// #  Field::new("c1", DataType::Int32, false),
96/// #  Field::new("c2", DataType::Int32, false),
97/// #  Field::new("c3", DataType::Int32, false),
98/// #  Field::new("c4", DataType::Int32, false),
99/// # ]));
100/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
101/// #[derive(Clone)]
102/// # struct ParquetSource {
103/// #    projected_statistics: Option<Statistics>,
104/// #    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>
105/// # };
106/// # impl FileSource for ParquetSource {
107/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
108/// #  fn as_any(&self) -> &dyn Any { self  }
109/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
110/// #  fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { Arc::new(self.clone()) as Arc<dyn FileSource> }
111/// #  fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
112/// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) }
113/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
114/// #  fn statistics(&self) -> Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) }
115/// #  fn file_type(&self) -> &str { "parquet" }
116/// #  fn with_schema_adapter_factory(&self, factory: Arc<dyn SchemaAdapterFactory>) -> Result<Arc<dyn FileSource>> { Ok(Arc::new(Self {projected_statistics: self.projected_statistics.clone(), schema_adapter_factory: Some(factory)} )) }
117/// #  fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { self.schema_adapter_factory.clone() }
118/// #  }
119/// # impl ParquetSource {
120/// #  fn new() -> Self { Self {projected_statistics: None, schema_adapter_factory: None} }
121/// # }
122/// // create FileScan config for reading parquet files from file://
123/// let object_store_url = ObjectStoreUrl::local_filesystem();
124/// let file_source = Arc::new(ParquetSource::new());
125/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
126///   .with_limit(Some(1000))            // read only the first 1000 records
127///   .with_projection(Some(vec![2, 3])) // project columns 2 and 3
128///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
129///   .with_file(PartitionedFile::new("file1.parquet", 1234))
130///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
131///   // in a  single row group
132///   .with_file_group(FileGroup::new(vec![
133///    PartitionedFile::new("file2.parquet", 56),
134///    PartitionedFile::new("file3.parquet", 78),
135///   ])).build();
136/// // create an execution plan from the config
137/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
138/// ```
139///
140/// [`DataSourceExec`]: crate::source::DataSourceExec
141/// [`DataSourceExec::from_data_source`]: crate::source::DataSourceExec::from_data_source
142#[derive(Clone)]
143pub struct FileScanConfig {
144    /// Object store URL, used to get an [`ObjectStore`] instance from
145    /// [`RuntimeEnv::object_store`]
146    ///
147    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
148    /// as `file://` or `s3://my_bucket`. It should not include the path to the
149    /// file itself. The relevant URL prefix must be registered via
150    /// [`RuntimeEnv::register_object_store`]
151    ///
152    /// [`ObjectStore`]: object_store::ObjectStore
153    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
154    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
155    pub object_store_url: ObjectStoreUrl,
156    /// Schema before `projection` is applied. It contains the all columns that may
157    /// appear in the files. It does not include table partition columns
158    /// that may be added.
159    /// Note that this is **not** the schema of the physical files.
160    /// This is the schema that the physical file schema will be
161    /// mapped onto, and the schema that the [`DataSourceExec`] will return.
162    ///
163    /// [`DataSourceExec`]: crate::source::DataSourceExec
164    pub file_schema: SchemaRef,
165    /// List of files to be processed, grouped into partitions
166    ///
167    /// Each file must have a schema of `file_schema` or a subset. If
168    /// a particular file has a subset, the missing columns are
169    /// padded with NULLs.
170    ///
171    /// DataFusion may attempt to read each partition of files
172    /// concurrently, however files *within* a partition will be read
173    /// sequentially, one after the next.
174    pub file_groups: Vec<FileGroup>,
175    /// Table constraints
176    pub constraints: Constraints,
177    /// Columns on which to project the data. Indexes that are higher than the
178    /// number of columns of `file_schema` refer to `table_partition_cols`.
179    pub projection: Option<Vec<usize>>,
180    /// The maximum number of records to read from this plan. If `None`,
181    /// all records after filtering are returned.
182    pub limit: Option<usize>,
183    /// The partitioning columns
184    pub table_partition_cols: Vec<FieldRef>,
185    /// All equivalent lexicographical orderings that describe the schema.
186    pub output_ordering: Vec<LexOrdering>,
187    /// File compression type
188    pub file_compression_type: FileCompressionType,
189    /// Are new lines in values supported for CSVOptions
190    pub new_lines_in_values: bool,
191    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
192    pub file_source: Arc<dyn FileSource>,
193    /// Batch size while creating new batches
194    /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
195    pub batch_size: Option<usize>,
196    /// Expression adapter used to adapt filters and projections that are pushed down into the scan
197    /// from the logical schema to the physical schema of the file.
198    pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
199}
200
201/// A builder for [`FileScanConfig`]'s.
202///
203/// Example:
204///
205/// ```rust
206/// # use std::sync::Arc;
207/// # use arrow::datatypes::{DataType, Field, Schema};
208/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig};
209/// # use datafusion_datasource::file_compression_type::FileCompressionType;
210/// # use datafusion_datasource::file_groups::FileGroup;
211/// # use datafusion_datasource::PartitionedFile;
212/// # use datafusion_execution::object_store::ObjectStoreUrl;
213/// # use datafusion_common::Statistics;
214/// # use datafusion_datasource::file::FileSource;
215///
216/// # fn main() {
217/// # fn with_source(file_source: Arc<dyn FileSource>) {
218///     // Create a schema for our Parquet files
219///     let schema = Arc::new(Schema::new(vec![
220///         Field::new("id", DataType::Int32, false),
221///         Field::new("value", DataType::Utf8, false),
222///     ]));
223///
224///     // Create a builder for scanning Parquet files from a local filesystem
225///     let config = FileScanConfigBuilder::new(
226///         ObjectStoreUrl::local_filesystem(),
227///         schema,
228///         file_source,
229///     )
230///     // Set a limit of 1000 rows
231///     .with_limit(Some(1000))
232///     // Project only the first column
233///     .with_projection(Some(vec![0]))
234///     // Add partition columns
235///     .with_table_partition_cols(vec![
236///         Field::new("date", DataType::Utf8, false),
237///     ])
238///     // Add a file group with two files
239///     .with_file_group(FileGroup::new(vec![
240///         PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024),
241///         PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048),
242///     ]))
243///     // Set compression type
244///     .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
245///     // Build the final config
246///     .build();
247/// # }
248/// # }
249/// ```
250#[derive(Clone)]
251pub struct FileScanConfigBuilder {
252    object_store_url: ObjectStoreUrl,
253    /// Table schema before any projections or partition columns are applied.
254    ///
255    /// This schema is used to read the files, but is **not** necessarily the
256    /// schema of the physical files. Rather this is the schema that the
257    /// physical file schema will be mapped onto, and the schema that the
258    /// [`DataSourceExec`] will return.
259    ///
260    /// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns.
261    ///
262    /// This probably would be better named `table_schema`
263    ///
264    /// [`DataSourceExec`]: crate::source::DataSourceExec
265    file_schema: SchemaRef,
266    file_source: Arc<dyn FileSource>,
267    limit: Option<usize>,
268    projection: Option<Vec<usize>>,
269    table_partition_cols: Vec<FieldRef>,
270    constraints: Option<Constraints>,
271    file_groups: Vec<FileGroup>,
272    statistics: Option<Statistics>,
273    output_ordering: Vec<LexOrdering>,
274    file_compression_type: Option<FileCompressionType>,
275    new_lines_in_values: Option<bool>,
276    batch_size: Option<usize>,
277    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
278}
279
280impl FileScanConfigBuilder {
281    /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files.
282    ///
283    /// # Parameters:
284    /// * `object_store_url`: See [`FileScanConfig::object_store_url`]
285    /// * `file_schema`: See [`FileScanConfig::file_schema`]
286    /// * `file_source`: See [`FileScanConfig::file_source`]
287    pub fn new(
288        object_store_url: ObjectStoreUrl,
289        file_schema: SchemaRef,
290        file_source: Arc<dyn FileSource>,
291    ) -> Self {
292        Self {
293            object_store_url,
294            file_schema,
295            file_source,
296            file_groups: vec![],
297            statistics: None,
298            output_ordering: vec![],
299            file_compression_type: None,
300            new_lines_in_values: None,
301            limit: None,
302            projection: None,
303            table_partition_cols: vec![],
304            constraints: None,
305            batch_size: None,
306            expr_adapter_factory: None,
307        }
308    }
309
310    /// Set the maximum number of records to read from this plan. If `None`,
311    /// all records after filtering are returned.
312    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
313        self.limit = limit;
314        self
315    }
316
317    /// Set the file source for scanning files.
318    ///
319    /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
320    /// after the builder has been created.
321    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
322        self.file_source = file_source;
323        self
324    }
325
326    /// Set the columns on which to project the data. Indexes that are higher than the
327    /// number of columns of `file_schema` refer to `table_partition_cols`.
328    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
329        self.projection = projection;
330        self
331    }
332
333    /// Set the partitioning columns
334    pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
335        self.table_partition_cols = table_partition_cols
336            .into_iter()
337            .map(|f| Arc::new(f) as FieldRef)
338            .collect();
339        self
340    }
341
342    /// Set the table constraints
343    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
344        self.constraints = Some(constraints);
345        self
346    }
347
348    /// Set the estimated overall statistics of the files, taking `filters` into account.
349    /// Defaults to [`Statistics::new_unknown`].
350    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
351        self.statistics = Some(statistics);
352        self
353    }
354
355    /// Set the list of files to be processed, grouped into partitions.
356    ///
357    /// Each file must have a schema of `file_schema` or a subset. If
358    /// a particular file has a subset, the missing columns are
359    /// padded with NULLs.
360    ///
361    /// DataFusion may attempt to read each partition of files
362    /// concurrently, however files *within* a partition will be read
363    /// sequentially, one after the next.
364    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
365        self.file_groups = file_groups;
366        self
367    }
368
369    /// Add a new file group
370    ///
371    /// See [`Self::with_file_groups`] for more information
372    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
373        self.file_groups.push(file_group);
374        self
375    }
376
377    /// Add a file as a single group
378    ///
379    /// See [`Self::with_file_groups`] for more information.
380    pub fn with_file(self, file: PartitionedFile) -> Self {
381        self.with_file_group(FileGroup::new(vec![file]))
382    }
383
384    /// Set the output ordering of the files
385    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
386        self.output_ordering = output_ordering;
387        self
388    }
389
390    /// Set the file compression type
391    pub fn with_file_compression_type(
392        mut self,
393        file_compression_type: FileCompressionType,
394    ) -> Self {
395        self.file_compression_type = Some(file_compression_type);
396        self
397    }
398
399    /// Set whether new lines in values are supported for CSVOptions
400    ///
401    /// Parsing newlines in quoted values may be affected by execution behaviour such as
402    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
403    /// parsed successfully, which may reduce performance.
404    pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
405        self.new_lines_in_values = Some(new_lines_in_values);
406        self
407    }
408
409    /// Set the batch_size property
410    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
411        self.batch_size = batch_size;
412        self
413    }
414
415    /// Register an expression adapter used to adapt filters and projections that are pushed down into the scan
416    /// from the logical schema to the physical schema of the file.
417    /// This can include things like:
418    /// - Column ordering changes
419    /// - Handling of missing columns
420    /// - Rewriting expression to use pre-computed values or file format specific optimizations
421    pub fn with_expr_adapter(
422        mut self,
423        expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
424    ) -> Self {
425        self.expr_adapter_factory = expr_adapter;
426        self
427    }
428
429    /// Build the final [`FileScanConfig`] with all the configured settings.
430    ///
431    /// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
432    /// Any unset optional fields will use their default values.
433    pub fn build(self) -> FileScanConfig {
434        let Self {
435            object_store_url,
436            file_schema,
437            file_source,
438            limit,
439            projection,
440            table_partition_cols,
441            constraints,
442            file_groups,
443            statistics,
444            output_ordering,
445            file_compression_type,
446            new_lines_in_values,
447            batch_size,
448            expr_adapter_factory: expr_adapter,
449        } = self;
450
451        let constraints = constraints.unwrap_or_default();
452        let statistics =
453            statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
454
455        let file_source = file_source
456            .with_statistics(statistics.clone())
457            .with_schema(Arc::clone(&file_schema));
458        let file_compression_type =
459            file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
460        let new_lines_in_values = new_lines_in_values.unwrap_or(false);
461
462        FileScanConfig {
463            object_store_url,
464            file_schema,
465            file_source,
466            limit,
467            projection,
468            table_partition_cols,
469            constraints,
470            file_groups,
471            output_ordering,
472            file_compression_type,
473            new_lines_in_values,
474            batch_size,
475            expr_adapter_factory: expr_adapter,
476        }
477    }
478}
479
480impl From<FileScanConfig> for FileScanConfigBuilder {
481    fn from(config: FileScanConfig) -> Self {
482        Self {
483            object_store_url: config.object_store_url,
484            file_schema: config.file_schema,
485            file_source: Arc::<dyn FileSource>::clone(&config.file_source),
486            file_groups: config.file_groups,
487            statistics: config.file_source.statistics().ok(),
488            output_ordering: config.output_ordering,
489            file_compression_type: Some(config.file_compression_type),
490            new_lines_in_values: Some(config.new_lines_in_values),
491            limit: config.limit,
492            projection: config.projection,
493            table_partition_cols: config.table_partition_cols,
494            constraints: Some(config.constraints),
495            batch_size: config.batch_size,
496            expr_adapter_factory: config.expr_adapter_factory,
497        }
498    }
499}
500
501impl DataSource for FileScanConfig {
502    fn open(
503        &self,
504        partition: usize,
505        context: Arc<TaskContext>,
506    ) -> Result<SendableRecordBatchStream> {
507        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
508        let batch_size = self
509            .batch_size
510            .unwrap_or_else(|| context.session_config().batch_size());
511
512        let source = self
513            .file_source
514            .with_batch_size(batch_size)
515            .with_projection(self);
516
517        let opener = source.create_file_opener(object_store, self, partition);
518
519        let stream = FileStream::new(self, partition, opener, source.metrics())?;
520        Ok(Box::pin(cooperative(stream)))
521    }
522
523    fn as_any(&self) -> &dyn Any {
524        self
525    }
526
527    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
528        match t {
529            DisplayFormatType::Default | DisplayFormatType::Verbose => {
530                let schema = self.projected_schema();
531                let orderings = get_projected_output_ordering(self, &schema);
532
533                write!(f, "file_groups=")?;
534                FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
535
536                if !schema.fields().is_empty() {
537                    write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
538                }
539
540                if let Some(limit) = self.limit {
541                    write!(f, ", limit={limit}")?;
542                }
543
544                display_orderings(f, &orderings)?;
545
546                if !self.constraints.is_empty() {
547                    write!(f, ", {}", self.constraints)?;
548                }
549
550                self.fmt_file_source(t, f)
551            }
552            DisplayFormatType::TreeRender => {
553                writeln!(f, "format={}", self.file_source.file_type())?;
554                self.file_source.fmt_extra(t, f)?;
555                let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
556                writeln!(f, "files={num_files}")?;
557                Ok(())
558            }
559        }
560    }
561
562    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
563    fn repartitioned(
564        &self,
565        target_partitions: usize,
566        repartition_file_min_size: usize,
567        output_ordering: Option<LexOrdering>,
568    ) -> Result<Option<Arc<dyn DataSource>>> {
569        let source = self.file_source.repartitioned(
570            target_partitions,
571            repartition_file_min_size,
572            output_ordering,
573            self,
574        )?;
575
576        Ok(source.map(|s| Arc::new(s) as _))
577    }
578
579    fn output_partitioning(&self) -> Partitioning {
580        Partitioning::UnknownPartitioning(self.file_groups.len())
581    }
582
583    fn eq_properties(&self) -> EquivalenceProperties {
584        let (schema, constraints, _, orderings) = self.project();
585        let mut eq_properties =
586            EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings)
587                .with_constraints(constraints);
588        if let Some(filter) = self.file_source.filter() {
589            // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
590            // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
591            match reassign_predicate_columns(filter, &schema, true) {
592                Ok(filter) => {
593                    match Self::add_filter_equivalence_info(
594                        filter,
595                        &mut eq_properties,
596                        &schema,
597                    ) {
598                        Ok(()) => {}
599                        Err(e) => {
600                            warn!("Failed to add filter equivalence info: {e}");
601                            #[cfg(debug_assertions)]
602                            panic!("Failed to add filter equivalence info: {e}");
603                        }
604                    }
605                }
606                Err(e) => {
607                    warn!("Failed to reassign predicate columns: {e}");
608                    #[cfg(debug_assertions)]
609                    panic!("Failed to reassign predicate columns: {e}");
610                }
611            };
612        }
613        eq_properties
614    }
615
616    fn scheduling_type(&self) -> SchedulingType {
617        SchedulingType::Cooperative
618    }
619
620    fn statistics(&self) -> Result<Statistics> {
621        Ok(self.projected_stats())
622    }
623
624    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
625        let source = FileScanConfigBuilder::from(self.clone())
626            .with_limit(limit)
627            .build();
628        Some(Arc::new(source))
629    }
630
631    fn fetch(&self) -> Option<usize> {
632        self.limit
633    }
634
635    fn metrics(&self) -> ExecutionPlanMetricsSet {
636        self.file_source.metrics().clone()
637    }
638
639    fn try_swapping_with_projection(
640        &self,
641        projection: &[ProjectionExpr],
642    ) -> Result<Option<Arc<dyn DataSource>>> {
643        // This process can be moved into CsvExec, but it would be an overlap of their responsibility.
644
645        // Must be all column references, with no table partition columns (which can not be projected)
646        let partitioned_columns_in_proj = projection.iter().any(|proj_expr| {
647            proj_expr
648                .expr
649                .as_any()
650                .downcast_ref::<Column>()
651                .map(|expr| expr.index() >= self.file_schema.fields().len())
652                .unwrap_or(false)
653        });
654
655        // If there is any non-column or alias-carrier expression, Projection should not be removed.
656        let no_aliases = all_alias_free_columns(projection);
657
658        Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
659            let file_scan = self.clone();
660            let source = Arc::clone(&file_scan.file_source);
661            let new_projections = new_projections_for_columns(
662                projection,
663                &file_scan
664                    .projection
665                    .clone()
666                    .unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()),
667            );
668
669            Arc::new(
670                FileScanConfigBuilder::from(file_scan)
671                    // Assign projected statistics to source
672                    .with_projection(Some(new_projections))
673                    .with_source(source)
674                    .build(),
675            ) as _
676        }))
677    }
678
679    fn try_pushdown_filters(
680        &self,
681        filters: Vec<Arc<dyn PhysicalExpr>>,
682        config: &ConfigOptions,
683    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
684        let result = self.file_source.try_pushdown_filters(filters, config)?;
685        match result.updated_node {
686            Some(new_file_source) => {
687                let file_scan_config = FileScanConfigBuilder::from(self.clone())
688                    .with_source(new_file_source)
689                    .build();
690                Ok(FilterPushdownPropagation {
691                    filters: result.filters,
692                    updated_node: Some(Arc::new(file_scan_config) as _),
693                })
694            }
695            None => {
696                // If the file source does not support filter pushdown, return the original config
697                Ok(FilterPushdownPropagation {
698                    filters: result.filters,
699                    updated_node: None,
700                })
701            }
702        }
703    }
704}
705
706impl FileScanConfig {
707    fn projection_indices(&self) -> Vec<usize> {
708        match &self.projection {
709            Some(proj) => proj.clone(),
710            None => (0..self.file_schema.fields().len()
711                + self.table_partition_cols.len())
712                .collect(),
713        }
714    }
715
716    pub fn projected_stats(&self) -> Statistics {
717        let statistics = self.file_source.statistics().unwrap();
718
719        let table_cols_stats = self
720            .projection_indices()
721            .into_iter()
722            .map(|idx| {
723                if idx < self.file_schema.fields().len() {
724                    statistics.column_statistics[idx].clone()
725                } else {
726                    // TODO provide accurate stat for partition column (#1186)
727                    ColumnStatistics::new_unknown()
728                }
729            })
730            .collect();
731
732        Statistics {
733            num_rows: statistics.num_rows,
734            // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
735            total_byte_size: statistics.total_byte_size,
736            column_statistics: table_cols_stats,
737        }
738    }
739
740    pub fn projected_schema(&self) -> Arc<Schema> {
741        let table_fields: Vec<_> = self
742            .projection_indices()
743            .into_iter()
744            .map(|idx| {
745                if idx < self.file_schema.fields().len() {
746                    self.file_schema.field(idx).clone()
747                } else {
748                    let partition_idx = idx - self.file_schema.fields().len();
749                    Arc::unwrap_or_clone(Arc::clone(
750                        &self.table_partition_cols[partition_idx],
751                    ))
752                }
753            })
754            .collect();
755
756        Arc::new(Schema::new_with_metadata(
757            table_fields,
758            self.file_schema.metadata().clone(),
759        ))
760    }
761
762    fn add_filter_equivalence_info(
763        filter: Arc<dyn PhysicalExpr>,
764        eq_properties: &mut EquivalenceProperties,
765        schema: &Schema,
766    ) -> Result<()> {
767        macro_rules! ignore_dangling_col {
768            ($col:expr) => {
769                if let Some(col) = $col.as_any().downcast_ref::<Column>() {
770                    if schema.index_of(col.name()).is_err() {
771                        continue;
772                    }
773                }
774            };
775        }
776
777        let (equal_pairs, _) = collect_columns_from_predicate(&filter);
778        for (lhs, rhs) in equal_pairs {
779            // Ignore any binary expressions that reference non-existent columns in the current schema
780            // (e.g. due to unnecessary projections being removed)
781            ignore_dangling_col!(lhs);
782            ignore_dangling_col!(rhs);
783            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
784        }
785        Ok(())
786    }
787
788    pub fn projected_constraints(&self) -> Constraints {
789        let indexes = self.projection_indices();
790        self.constraints.project(&indexes).unwrap_or_default()
791    }
792
793    /// Specifies whether newlines in (quoted) values are supported.
794    ///
795    /// Parsing newlines in quoted values may be affected by execution behaviour such as
796    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
797    /// parsed successfully, which may reduce performance.
798    ///
799    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
800    pub fn newlines_in_values(&self) -> bool {
801        self.new_lines_in_values
802    }
803
804    /// Project the schema, constraints, and the statistics on the given column indices
805    pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
806        if self.projection.is_none() && self.table_partition_cols.is_empty() {
807            return (
808                Arc::clone(&self.file_schema),
809                self.constraints.clone(),
810                self.file_source.statistics().unwrap().clone(),
811                self.output_ordering.clone(),
812            );
813        }
814
815        let schema = self.projected_schema();
816        let constraints = self.projected_constraints();
817        let stats = self.projected_stats();
818
819        let output_ordering = get_projected_output_ordering(self, &schema);
820
821        (schema, constraints, stats, output_ordering)
822    }
823
824    pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
825        self.projection.as_ref().map(|p| {
826            p.iter()
827                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
828                .map(|col_idx| self.file_schema.field(*col_idx).name())
829                .cloned()
830                .collect()
831        })
832    }
833
834    /// Projects only file schema, ignoring partition columns
835    pub fn projected_file_schema(&self) -> SchemaRef {
836        let fields = self.file_column_projection_indices().map(|indices| {
837            indices
838                .iter()
839                .map(|col_idx| self.file_schema.field(*col_idx))
840                .cloned()
841                .collect::<Vec<_>>()
842        });
843
844        fields.map_or_else(
845            || Arc::clone(&self.file_schema),
846            |f| {
847                Arc::new(Schema::new_with_metadata(
848                    f,
849                    self.file_schema.metadata.clone(),
850                ))
851            },
852        )
853    }
854
855    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
856        self.projection.as_ref().map(|p| {
857            p.iter()
858                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
859                .copied()
860                .collect()
861        })
862    }
863
864    /// Splits file groups into new groups based on statistics to enable efficient parallel processing.
865    ///
866    /// The method distributes files across a target number of partitions while ensuring
867    /// files within each partition maintain sort order based on their min/max statistics.
868    ///
869    /// The algorithm works by:
870    /// 1. Takes files sorted by minimum values
871    /// 2. For each file:
872    ///   - Finds eligible groups (empty or where file's min > group's last max)
873    ///   - Selects the smallest eligible group
874    ///   - Creates a new group if needed
875    ///
876    /// # Parameters
877    /// * `table_schema`: Schema containing information about the columns
878    /// * `file_groups`: The original file groups to split
879    /// * `sort_order`: The lexicographical ordering to maintain within each group
880    /// * `target_partitions`: The desired number of output partitions
881    ///
882    /// # Returns
883    /// A new set of file groups, where files within each group are non-overlapping with respect to
884    /// their min/max statistics and maintain the specified sort order.
885    pub fn split_groups_by_statistics_with_target_partitions(
886        table_schema: &SchemaRef,
887        file_groups: &[FileGroup],
888        sort_order: &LexOrdering,
889        target_partitions: usize,
890    ) -> Result<Vec<FileGroup>> {
891        if target_partitions == 0 {
892            return Err(DataFusionError::Internal(
893                "target_partitions must be greater than 0".to_string(),
894            ));
895        }
896
897        let flattened_files = file_groups
898            .iter()
899            .flat_map(FileGroup::iter)
900            .collect::<Vec<_>>();
901
902        if flattened_files.is_empty() {
903            return Ok(vec![]);
904        }
905
906        let statistics = MinMaxStatistics::new_from_files(
907            sort_order,
908            table_schema,
909            None,
910            flattened_files.iter().copied(),
911        )?;
912
913        let indices_sorted_by_min = statistics.min_values_sorted();
914
915        // Initialize with target_partitions empty groups
916        let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
917
918        for (idx, min) in indices_sorted_by_min {
919            if let Some((_, group)) = file_groups_indices
920                .iter_mut()
921                .enumerate()
922                .filter(|(_, group)| {
923                    group.is_empty()
924                        || min
925                            > statistics
926                                .max(*group.last().expect("groups should not be empty"))
927                })
928                .min_by_key(|(_, group)| group.len())
929            {
930                group.push(idx);
931            } else {
932                // Create a new group if no existing group fits
933                file_groups_indices.push(vec![idx]);
934            }
935        }
936
937        // Remove any empty groups
938        file_groups_indices.retain(|group| !group.is_empty());
939
940        // Assemble indices back into groups of PartitionedFiles
941        Ok(file_groups_indices
942            .into_iter()
943            .map(|file_group_indices| {
944                FileGroup::new(
945                    file_group_indices
946                        .into_iter()
947                        .map(|idx| flattened_files[idx].clone())
948                        .collect(),
949                )
950            })
951            .collect())
952    }
953
954    /// Attempts to do a bin-packing on files into file groups, such that any two files
955    /// in a file group are ordered and non-overlapping with respect to their statistics.
956    /// It will produce the smallest number of file groups possible.
957    pub fn split_groups_by_statistics(
958        table_schema: &SchemaRef,
959        file_groups: &[FileGroup],
960        sort_order: &LexOrdering,
961    ) -> Result<Vec<FileGroup>> {
962        let flattened_files = file_groups
963            .iter()
964            .flat_map(FileGroup::iter)
965            .collect::<Vec<_>>();
966        // First Fit:
967        // * Choose the first file group that a file can be placed into.
968        // * If it fits into no existing file groups, create a new one.
969        //
970        // By sorting files by min values and then applying first-fit bin packing,
971        // we can produce the smallest number of file groups such that
972        // files within a group are in order and non-overlapping.
973        //
974        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
975        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
976
977        if flattened_files.is_empty() {
978            return Ok(vec![]);
979        }
980
981        let statistics = MinMaxStatistics::new_from_files(
982            sort_order,
983            table_schema,
984            None,
985            flattened_files.iter().copied(),
986        )
987        .map_err(|e| {
988            e.context("construct min/max statistics for split_groups_by_statistics")
989        })?;
990
991        let indices_sorted_by_min = statistics.min_values_sorted();
992        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
993
994        for (idx, min) in indices_sorted_by_min {
995            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
996                // If our file is non-overlapping and comes _after_ the last file,
997                // it fits in this file group.
998                min > statistics.max(
999                    *group
1000                        .last()
1001                        .expect("groups should be nonempty at construction"),
1002                )
1003            });
1004            match file_group_to_insert {
1005                Some(group) => group.push(idx),
1006                None => file_groups_indices.push(vec![idx]),
1007            }
1008        }
1009
1010        // Assemble indices back into groups of PartitionedFiles
1011        Ok(file_groups_indices
1012            .into_iter()
1013            .map(|file_group_indices| {
1014                file_group_indices
1015                    .into_iter()
1016                    .map(|idx| flattened_files[idx].clone())
1017                    .collect()
1018            })
1019            .collect())
1020    }
1021
1022    /// Write the data_type based on file_source
1023    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1024        write!(f, ", file_type={}", self.file_source.file_type())?;
1025        self.file_source.fmt_extra(t, f)
1026    }
1027
1028    /// Returns the file_source
1029    pub fn file_source(&self) -> &Arc<dyn FileSource> {
1030        &self.file_source
1031    }
1032}
1033
1034impl Debug for FileScanConfig {
1035    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1036        write!(f, "FileScanConfig {{")?;
1037        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1038
1039        write!(
1040            f,
1041            "statistics={:?}, ",
1042            self.file_source.statistics().unwrap()
1043        )?;
1044
1045        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1046        write!(f, "}}")
1047    }
1048}
1049
1050impl DisplayAs for FileScanConfig {
1051    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1052        let schema = self.projected_schema();
1053        let orderings = get_projected_output_ordering(self, &schema);
1054
1055        write!(f, "file_groups=")?;
1056        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1057
1058        if !schema.fields().is_empty() {
1059            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1060        }
1061
1062        if let Some(limit) = self.limit {
1063            write!(f, ", limit={limit}")?;
1064        }
1065
1066        display_orderings(f, &orderings)?;
1067
1068        if !self.constraints.is_empty() {
1069            write!(f, ", {}", self.constraints)?;
1070        }
1071
1072        Ok(())
1073    }
1074}
1075
1076/// A helper that projects partition columns into the file record batches.
1077///
1078/// One interesting trick is the usage of a cache for the key buffers of the partition column
1079/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
1080/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
1081/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
1082pub struct PartitionColumnProjector {
1083    /// An Arrow buffer initialized to zeros that represents the key array of all partition
1084    /// columns (partition columns are materialized by dictionary arrays with only one
1085    /// value in the dictionary, thus all the keys are equal to zero).
1086    key_buffer_cache: ZeroBufferGenerators,
1087    /// Mapping between the indexes in the list of partition columns and the target
1088    /// schema. Sorted by index in the target schema so that we can iterate on it to
1089    /// insert the partition columns in the target record batch.
1090    projected_partition_indexes: Vec<(usize, usize)>,
1091    /// The schema of the table once the projection was applied.
1092    projected_schema: SchemaRef,
1093}
1094
1095impl PartitionColumnProjector {
1096    // Create a projector to insert the partitioning columns into batches read from files
1097    // - `projected_schema`: the target schema with both file and partitioning columns
1098    // - `table_partition_cols`: all the partitioning column names
1099    pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1100        let mut idx_map = HashMap::new();
1101        for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1102            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1103                idx_map.insert(partition_idx, schema_idx);
1104            }
1105        }
1106
1107        let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1108        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1109
1110        Self {
1111            projected_partition_indexes,
1112            key_buffer_cache: Default::default(),
1113            projected_schema,
1114        }
1115    }
1116
1117    // Transform the batch read from the file by inserting the partitioning columns
1118    // to the right positions as deduced from `projected_schema`
1119    // - `file_batch`: batch read from the file, with internal projection applied
1120    // - `partition_values`: the list of partition values, one for each partition column
1121    pub fn project(
1122        &mut self,
1123        file_batch: RecordBatch,
1124        partition_values: &[ScalarValue],
1125    ) -> Result<RecordBatch> {
1126        let expected_cols =
1127            self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1128
1129        if file_batch.columns().len() != expected_cols {
1130            return exec_err!(
1131                "Unexpected batch schema from file, expected {} cols but got {}",
1132                expected_cols,
1133                file_batch.columns().len()
1134            );
1135        }
1136
1137        let mut cols = file_batch.columns().to_vec();
1138        for &(pidx, sidx) in &self.projected_partition_indexes {
1139            let p_value =
1140                partition_values
1141                    .get(pidx)
1142                    .ok_or(DataFusionError::Execution(
1143                        "Invalid partitioning found on disk".to_string(),
1144                    ))?;
1145
1146            let mut partition_value = Cow::Borrowed(p_value);
1147
1148            // check if user forgot to dict-encode the partition value
1149            let field = self.projected_schema.field(sidx);
1150            let expected_data_type = field.data_type();
1151            let actual_data_type = partition_value.data_type();
1152            if let DataType::Dictionary(key_type, _) = expected_data_type {
1153                if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1154                    warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1155                    partition_value = Cow::Owned(ScalarValue::Dictionary(
1156                        key_type.clone(),
1157                        Box::new(partition_value.as_ref().clone()),
1158                    ));
1159                }
1160            }
1161
1162            cols.insert(
1163                sidx,
1164                create_output_array(
1165                    &mut self.key_buffer_cache,
1166                    partition_value.as_ref(),
1167                    file_batch.num_rows(),
1168                )?,
1169            )
1170        }
1171
1172        RecordBatch::try_new_with_options(
1173            Arc::clone(&self.projected_schema),
1174            cols,
1175            &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1176        )
1177        .map_err(Into::into)
1178    }
1179}
1180
1181#[derive(Debug, Default)]
1182struct ZeroBufferGenerators {
1183    gen_i8: ZeroBufferGenerator<i8>,
1184    gen_i16: ZeroBufferGenerator<i16>,
1185    gen_i32: ZeroBufferGenerator<i32>,
1186    gen_i64: ZeroBufferGenerator<i64>,
1187    gen_u8: ZeroBufferGenerator<u8>,
1188    gen_u16: ZeroBufferGenerator<u16>,
1189    gen_u32: ZeroBufferGenerator<u32>,
1190    gen_u64: ZeroBufferGenerator<u64>,
1191}
1192
1193/// Generate a arrow [`Buffer`] that contains zero values.
1194#[derive(Debug, Default)]
1195struct ZeroBufferGenerator<T>
1196where
1197    T: ArrowNativeType,
1198{
1199    cache: Option<Buffer>,
1200    _t: PhantomData<T>,
1201}
1202
1203impl<T> ZeroBufferGenerator<T>
1204where
1205    T: ArrowNativeType,
1206{
1207    const SIZE: usize = size_of::<T>();
1208
1209    fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1210        match &mut self.cache {
1211            Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1212                buf.slice_with_length(0, n_vals * Self::SIZE)
1213            }
1214            _ => {
1215                let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1216                key_buffer_builder.advance(n_vals); // keys are all 0
1217                self.cache.insert(key_buffer_builder.finish()).clone()
1218            }
1219        }
1220    }
1221}
1222
1223fn create_dict_array<T>(
1224    buffer_gen: &mut ZeroBufferGenerator<T>,
1225    dict_val: &ScalarValue,
1226    len: usize,
1227    data_type: DataType,
1228) -> Result<ArrayRef>
1229where
1230    T: ArrowNativeType,
1231{
1232    let dict_vals = dict_val.to_array()?;
1233
1234    let sliced_key_buffer = buffer_gen.get_buffer(len);
1235
1236    // assemble pieces together
1237    let mut builder = ArrayData::builder(data_type)
1238        .len(len)
1239        .add_buffer(sliced_key_buffer);
1240    builder = builder.add_child_data(dict_vals.to_data());
1241    Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1242        builder.build().unwrap(),
1243    )))
1244}
1245
1246fn create_output_array(
1247    key_buffer_cache: &mut ZeroBufferGenerators,
1248    val: &ScalarValue,
1249    len: usize,
1250) -> Result<ArrayRef> {
1251    if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1252        match key_type.as_ref() {
1253            DataType::Int8 => {
1254                return create_dict_array(
1255                    &mut key_buffer_cache.gen_i8,
1256                    dict_val,
1257                    len,
1258                    val.data_type(),
1259                );
1260            }
1261            DataType::Int16 => {
1262                return create_dict_array(
1263                    &mut key_buffer_cache.gen_i16,
1264                    dict_val,
1265                    len,
1266                    val.data_type(),
1267                );
1268            }
1269            DataType::Int32 => {
1270                return create_dict_array(
1271                    &mut key_buffer_cache.gen_i32,
1272                    dict_val,
1273                    len,
1274                    val.data_type(),
1275                );
1276            }
1277            DataType::Int64 => {
1278                return create_dict_array(
1279                    &mut key_buffer_cache.gen_i64,
1280                    dict_val,
1281                    len,
1282                    val.data_type(),
1283                );
1284            }
1285            DataType::UInt8 => {
1286                return create_dict_array(
1287                    &mut key_buffer_cache.gen_u8,
1288                    dict_val,
1289                    len,
1290                    val.data_type(),
1291                );
1292            }
1293            DataType::UInt16 => {
1294                return create_dict_array(
1295                    &mut key_buffer_cache.gen_u16,
1296                    dict_val,
1297                    len,
1298                    val.data_type(),
1299                );
1300            }
1301            DataType::UInt32 => {
1302                return create_dict_array(
1303                    &mut key_buffer_cache.gen_u32,
1304                    dict_val,
1305                    len,
1306                    val.data_type(),
1307                );
1308            }
1309            DataType::UInt64 => {
1310                return create_dict_array(
1311                    &mut key_buffer_cache.gen_u64,
1312                    dict_val,
1313                    len,
1314                    val.data_type(),
1315                );
1316            }
1317            _ => {}
1318        }
1319    }
1320
1321    val.to_array_of_size(len)
1322}
1323
1324/// The various listing tables does not attempt to read all files
1325/// concurrently, instead they will read files in sequence within a
1326/// partition.  This is an important property as it allows plans to
1327/// run against 1000s of files and not try to open them all
1328/// concurrently.
1329///
1330/// However, it means if we assign more than one file to a partition
1331/// the output sort order will not be preserved as illustrated in the
1332/// following diagrams:
1333///
1334/// When only 1 file is assigned to each partition, each partition is
1335/// correctly sorted on `(A, B, C)`
1336///
1337/// ```text
1338///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
1339///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
1340///┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
1341///  │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
1342///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
1343///  │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
1344///┃                                          │                    │                     ┃
1345///  │                   │ │                    │                    │                 │
1346///┃                                          │                    │                     ┃
1347///  │                   │ │                    │                    │                 │
1348///┃                                          │                    │                     ┃
1349///  │                   │ │                    │                    │                 │
1350///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1351///     DataFusion           DataFusion           DataFusion           DataFusion
1352///┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
1353/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1354///
1355///                                      DataSourceExec
1356///```
1357///
1358/// However, when more than 1 file is assigned to each partition, each
1359/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
1360/// file is scanned, the same values for A, B and C can be repeated in
1361/// the same sorted stream
1362///
1363///```text
1364///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1365///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1366///┃   ┌───────────────┐     ┌──────────────┐ │
1367///  │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
1368///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1369///  │ └───────────────┘ │ │ └──────────────┘   ┃
1370///┃   ┌───────────────┐     ┌──────────────┐ │
1371///  │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
1372///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1373///  │ └───────────────┘ │ │ └──────────────┘   ┃
1374///┃                                          │
1375///  │                   │ │                    ┃
1376///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
1377///     DataFusion           DataFusion         ┃
1378///┃    Partition 1          Partition 2
1379/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
1380///
1381///              DataSourceExec
1382///```
1383fn get_projected_output_ordering(
1384    base_config: &FileScanConfig,
1385    projected_schema: &SchemaRef,
1386) -> Vec<LexOrdering> {
1387    let mut all_orderings = vec![];
1388    for output_ordering in &base_config.output_ordering {
1389        let mut new_ordering = vec![];
1390        for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1391            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1392                let name = col.name();
1393                if let Some((idx, _)) = projected_schema.column_with_name(name) {
1394                    // Compute the new sort expression (with correct index) after projection:
1395                    new_ordering.push(PhysicalSortExpr::new(
1396                        Arc::new(Column::new(name, idx)),
1397                        *options,
1398                    ));
1399                    continue;
1400                }
1401            }
1402            // Cannot find expression in the projected_schema, stop iterating
1403            // since rest of the orderings are violated
1404            break;
1405        }
1406
1407        let Some(new_ordering) = LexOrdering::new(new_ordering) else {
1408            continue;
1409        };
1410
1411        // Check if any file groups are not sorted
1412        if base_config.file_groups.iter().any(|group| {
1413            if group.len() <= 1 {
1414                // File groups with <= 1 files are always sorted
1415                return false;
1416            }
1417
1418            let statistics = match MinMaxStatistics::new_from_files(
1419                &new_ordering,
1420                projected_schema,
1421                base_config.projection.as_deref(),
1422                group.iter(),
1423            ) {
1424                Ok(statistics) => statistics,
1425                Err(e) => {
1426                    log::trace!("Error fetching statistics for file group: {e}");
1427                    // we can't prove that it's ordered, so we have to reject it
1428                    return true;
1429                }
1430            };
1431
1432            !statistics.is_sorted()
1433        }) {
1434            debug!(
1435                "Skipping specified output ordering {:?}. \
1436                Some file groups couldn't be determined to be sorted: {:?}",
1437                base_config.output_ordering[0], base_config.file_groups
1438            );
1439            continue;
1440        }
1441
1442        all_orderings.push(new_ordering);
1443    }
1444    all_orderings
1445}
1446
1447/// Convert type to a type suitable for use as a `ListingTable`
1448/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1449/// a reasonable trade off between a reasonable number of partition
1450/// values and space efficiency.
1451///
1452/// This use this to specify types for partition columns. However
1453/// you MAY also choose not to dictionary-encode the data or to use a
1454/// different dictionary type.
1455///
1456/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1457pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1458    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1459}
1460
1461/// Convert a [`ScalarValue`] of partition columns to a type, as
1462/// described in the documentation of [`wrap_partition_type_in_dict`],
1463/// which can wrap the types.
1464pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1465    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1466}
1467
1468#[cfg(test)]
1469mod tests {
1470    use super::*;
1471    use crate::test_util::col;
1472    use crate::{
1473        generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1474        verify_sort_integrity,
1475    };
1476
1477    use arrow::array::{Int32Array, RecordBatch};
1478    use datafusion_common::stats::Precision;
1479    use datafusion_common::{assert_batches_eq, internal_err};
1480    use datafusion_expr::{Operator, SortExpr};
1481    use datafusion_physical_expr::create_physical_sort_expr;
1482    use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1483
1484    /// Returns the column names on the schema
1485    pub fn columns(schema: &Schema) -> Vec<String> {
1486        schema.fields().iter().map(|f| f.name().clone()).collect()
1487    }
1488
1489    #[test]
1490    fn physical_plan_config_no_projection() {
1491        let file_schema = aggr_test_schema();
1492        let conf = config_for_projection(
1493            Arc::clone(&file_schema),
1494            None,
1495            Statistics::new_unknown(&file_schema),
1496            to_partition_cols(vec![(
1497                "date".to_owned(),
1498                wrap_partition_type_in_dict(DataType::Utf8),
1499            )]),
1500        );
1501
1502        let (proj_schema, _, proj_statistics, _) = conf.project();
1503        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1504        assert_eq!(
1505            proj_schema.field(file_schema.fields().len()).name(),
1506            "date",
1507            "partition columns are the last columns"
1508        );
1509        assert_eq!(
1510            proj_statistics.column_statistics.len(),
1511            file_schema.fields().len() + 1
1512        );
1513        // TODO implement tests for partition column statistics once implemented
1514
1515        let col_names = conf.projected_file_column_names();
1516        assert_eq!(col_names, None);
1517
1518        let col_indices = conf.file_column_projection_indices();
1519        assert_eq!(col_indices, None);
1520    }
1521
1522    #[test]
1523    fn physical_plan_config_no_projection_tab_cols_as_field() {
1524        let file_schema = aggr_test_schema();
1525
1526        // make a table_partition_col as a field
1527        let table_partition_col =
1528            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1529                .with_metadata(HashMap::from_iter(vec![(
1530                    "key_whatever".to_owned(),
1531                    "value_whatever".to_owned(),
1532                )]));
1533
1534        let conf = config_for_projection(
1535            Arc::clone(&file_schema),
1536            None,
1537            Statistics::new_unknown(&file_schema),
1538            vec![table_partition_col.clone()],
1539        );
1540
1541        // verify the proj_schema includes the last column and exactly the same the field it is defined
1542        let proj_schema = conf.projected_schema();
1543        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1544        assert_eq!(
1545            *proj_schema.field(file_schema.fields().len()),
1546            table_partition_col,
1547            "partition columns are the last columns and ust have all values defined in created field"
1548        );
1549    }
1550
1551    #[test]
1552    fn physical_plan_config_with_projection() {
1553        let file_schema = aggr_test_schema();
1554        let conf = config_for_projection(
1555            Arc::clone(&file_schema),
1556            Some(vec![file_schema.fields().len(), 0]),
1557            Statistics {
1558                num_rows: Precision::Inexact(10),
1559                // assign the column index to distinct_count to help assert
1560                // the source statistic after the projection
1561                column_statistics: (0..file_schema.fields().len())
1562                    .map(|i| ColumnStatistics {
1563                        distinct_count: Precision::Inexact(i),
1564                        ..Default::default()
1565                    })
1566                    .collect(),
1567                total_byte_size: Precision::Absent,
1568            },
1569            to_partition_cols(vec![(
1570                "date".to_owned(),
1571                wrap_partition_type_in_dict(DataType::Utf8),
1572            )]),
1573        );
1574
1575        let (proj_schema, _, proj_statistics, _) = conf.project();
1576        assert_eq!(
1577            columns(&proj_schema),
1578            vec!["date".to_owned(), "c1".to_owned()]
1579        );
1580        let proj_stat_cols = proj_statistics.column_statistics;
1581        assert_eq!(proj_stat_cols.len(), 2);
1582        // TODO implement tests for proj_stat_cols[0] once partition column
1583        // statistics are implemented
1584        assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1585
1586        let col_names = conf.projected_file_column_names();
1587        assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1588
1589        let col_indices = conf.file_column_projection_indices();
1590        assert_eq!(col_indices, Some(vec![0]));
1591    }
1592
1593    #[test]
1594    fn partition_column_projector() {
1595        let file_batch = build_table_i32(
1596            ("a", &vec![0, 1, 2]),
1597            ("b", &vec![-2, -1, 0]),
1598            ("c", &vec![10, 11, 12]),
1599        );
1600        let partition_cols = vec![
1601            (
1602                "year".to_owned(),
1603                wrap_partition_type_in_dict(DataType::Utf8),
1604            ),
1605            (
1606                "month".to_owned(),
1607                wrap_partition_type_in_dict(DataType::Utf8),
1608            ),
1609            (
1610                "day".to_owned(),
1611                wrap_partition_type_in_dict(DataType::Utf8),
1612            ),
1613        ];
1614        // create a projected schema
1615        let statistics = Statistics {
1616            num_rows: Precision::Inexact(3),
1617            total_byte_size: Precision::Absent,
1618            column_statistics: Statistics::unknown_column(&file_batch.schema()),
1619        };
1620
1621        let conf = config_for_projection(
1622            file_batch.schema(),
1623            // keep all cols from file and 2 from partitioning
1624            Some(vec![
1625                0,
1626                1,
1627                2,
1628                file_batch.schema().fields().len(),
1629                file_batch.schema().fields().len() + 2,
1630            ]),
1631            statistics.clone(),
1632            to_partition_cols(partition_cols.clone()),
1633        );
1634
1635        let source_statistics = conf.file_source.statistics().unwrap();
1636        let conf_stats = conf.statistics().unwrap();
1637
1638        // projection should be reflected in the file source statistics
1639        assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1640
1641        // 3 original statistics + 2 partition statistics
1642        assert_eq!(conf_stats.column_statistics.len(), 5);
1643
1644        // file statics should not be modified
1645        assert_eq!(source_statistics, statistics);
1646        assert_eq!(source_statistics.column_statistics.len(), 3);
1647
1648        let proj_schema = conf.projected_schema();
1649        // created a projector for that projected schema
1650        let mut proj = PartitionColumnProjector::new(
1651            proj_schema,
1652            &partition_cols
1653                .iter()
1654                .map(|x| x.0.clone())
1655                .collect::<Vec<_>>(),
1656        );
1657
1658        // project first batch
1659        let projected_batch = proj
1660            .project(
1661                // file_batch is ok here because we kept all the file cols in the projection
1662                file_batch,
1663                &[
1664                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1665                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1666                    wrap_partition_value_in_dict(ScalarValue::from("26")),
1667                ],
1668            )
1669            .expect("Projection of partition columns into record batch failed");
1670        let expected = [
1671            "+---+----+----+------+-----+",
1672            "| a | b  | c  | year | day |",
1673            "+---+----+----+------+-----+",
1674            "| 0 | -2 | 10 | 2021 | 26  |",
1675            "| 1 | -1 | 11 | 2021 | 26  |",
1676            "| 2 | 0  | 12 | 2021 | 26  |",
1677            "+---+----+----+------+-----+",
1678        ];
1679        assert_batches_eq!(expected, &[projected_batch]);
1680
1681        // project another batch that is larger than the previous one
1682        let file_batch = build_table_i32(
1683            ("a", &vec![5, 6, 7, 8, 9]),
1684            ("b", &vec![-10, -9, -8, -7, -6]),
1685            ("c", &vec![12, 13, 14, 15, 16]),
1686        );
1687        let projected_batch = proj
1688            .project(
1689                // file_batch is ok here because we kept all the file cols in the projection
1690                file_batch,
1691                &[
1692                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1693                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1694                    wrap_partition_value_in_dict(ScalarValue::from("27")),
1695                ],
1696            )
1697            .expect("Projection of partition columns into record batch failed");
1698        let expected = [
1699            "+---+-----+----+------+-----+",
1700            "| a | b   | c  | year | day |",
1701            "+---+-----+----+------+-----+",
1702            "| 5 | -10 | 12 | 2021 | 27  |",
1703            "| 6 | -9  | 13 | 2021 | 27  |",
1704            "| 7 | -8  | 14 | 2021 | 27  |",
1705            "| 8 | -7  | 15 | 2021 | 27  |",
1706            "| 9 | -6  | 16 | 2021 | 27  |",
1707            "+---+-----+----+------+-----+",
1708        ];
1709        assert_batches_eq!(expected, &[projected_batch]);
1710
1711        // project another batch that is smaller than the previous one
1712        let file_batch = build_table_i32(
1713            ("a", &vec![0, 1, 3]),
1714            ("b", &vec![2, 3, 4]),
1715            ("c", &vec![4, 5, 6]),
1716        );
1717        let projected_batch = proj
1718            .project(
1719                // file_batch is ok here because we kept all the file cols in the projection
1720                file_batch,
1721                &[
1722                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1723                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1724                    wrap_partition_value_in_dict(ScalarValue::from("28")),
1725                ],
1726            )
1727            .expect("Projection of partition columns into record batch failed");
1728        let expected = [
1729            "+---+---+---+------+-----+",
1730            "| a | b | c | year | day |",
1731            "+---+---+---+------+-----+",
1732            "| 0 | 2 | 4 | 2021 | 28  |",
1733            "| 1 | 3 | 5 | 2021 | 28  |",
1734            "| 3 | 4 | 6 | 2021 | 28  |",
1735            "+---+---+---+------+-----+",
1736        ];
1737        assert_batches_eq!(expected, &[projected_batch]);
1738
1739        // forgot to dictionary-wrap the scalar value
1740        let file_batch = build_table_i32(
1741            ("a", &vec![0, 1, 2]),
1742            ("b", &vec![-2, -1, 0]),
1743            ("c", &vec![10, 11, 12]),
1744        );
1745        let projected_batch = proj
1746            .project(
1747                // file_batch is ok here because we kept all the file cols in the projection
1748                file_batch,
1749                &[
1750                    ScalarValue::from("2021"),
1751                    ScalarValue::from("10"),
1752                    ScalarValue::from("26"),
1753                ],
1754            )
1755            .expect("Projection of partition columns into record batch failed");
1756        let expected = [
1757            "+---+----+----+------+-----+",
1758            "| a | b  | c  | year | day |",
1759            "+---+----+----+------+-----+",
1760            "| 0 | -2 | 10 | 2021 | 26  |",
1761            "| 1 | -1 | 11 | 2021 | 26  |",
1762            "| 2 | 0  | 12 | 2021 | 26  |",
1763            "+---+----+----+------+-----+",
1764        ];
1765        assert_batches_eq!(expected, &[projected_batch]);
1766    }
1767
1768    #[test]
1769    fn test_projected_file_schema_with_partition_col() {
1770        let schema = aggr_test_schema();
1771        let partition_cols = vec![
1772            (
1773                "part1".to_owned(),
1774                wrap_partition_type_in_dict(DataType::Utf8),
1775            ),
1776            (
1777                "part2".to_owned(),
1778                wrap_partition_type_in_dict(DataType::Utf8),
1779            ),
1780        ];
1781
1782        // Projected file schema for config with projection including partition column
1783        let projection = config_for_projection(
1784            schema.clone(),
1785            Some(vec![0, 3, 5, schema.fields().len()]),
1786            Statistics::new_unknown(&schema),
1787            to_partition_cols(partition_cols),
1788        )
1789        .projected_file_schema();
1790
1791        // Assert partition column filtered out in projected file schema
1792        let expected_columns = vec!["c1", "c4", "c6"];
1793        let actual_columns = projection
1794            .fields()
1795            .iter()
1796            .map(|f| f.name().clone())
1797            .collect::<Vec<_>>();
1798        assert_eq!(expected_columns, actual_columns);
1799    }
1800
1801    #[test]
1802    fn test_projected_file_schema_without_projection() {
1803        let schema = aggr_test_schema();
1804        let partition_cols = vec![
1805            (
1806                "part1".to_owned(),
1807                wrap_partition_type_in_dict(DataType::Utf8),
1808            ),
1809            (
1810                "part2".to_owned(),
1811                wrap_partition_type_in_dict(DataType::Utf8),
1812            ),
1813        ];
1814
1815        // Projected file schema for config without projection
1816        let projection = config_for_projection(
1817            schema.clone(),
1818            None,
1819            Statistics::new_unknown(&schema),
1820            to_partition_cols(partition_cols),
1821        )
1822        .projected_file_schema();
1823
1824        // Assert projected file schema is equal to file schema
1825        assert_eq!(projection.fields(), schema.fields());
1826    }
1827
1828    #[test]
1829    fn test_split_groups_by_statistics() -> Result<()> {
1830        use chrono::TimeZone;
1831        use datafusion_common::DFSchema;
1832        use datafusion_expr::execution_props::ExecutionProps;
1833        use object_store::{path::Path, ObjectMeta};
1834
1835        struct File {
1836            name: &'static str,
1837            date: &'static str,
1838            statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1839        }
1840        impl File {
1841            fn new(
1842                name: &'static str,
1843                date: &'static str,
1844                statistics: Vec<Option<(f64, f64)>>,
1845            ) -> Self {
1846                Self::new_nullable(
1847                    name,
1848                    date,
1849                    statistics
1850                        .into_iter()
1851                        .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1852                        .collect(),
1853                )
1854            }
1855
1856            fn new_nullable(
1857                name: &'static str,
1858                date: &'static str,
1859                statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1860            ) -> Self {
1861                Self {
1862                    name,
1863                    date,
1864                    statistics,
1865                }
1866            }
1867        }
1868
1869        struct TestCase {
1870            name: &'static str,
1871            file_schema: Schema,
1872            files: Vec<File>,
1873            sort: Vec<SortExpr>,
1874            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1875        }
1876
1877        use datafusion_expr::col;
1878        let cases = vec![
1879            TestCase {
1880                name: "test sort",
1881                file_schema: Schema::new(vec![Field::new(
1882                    "value".to_string(),
1883                    DataType::Float64,
1884                    false,
1885                )]),
1886                files: vec![
1887                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1888                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1889                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1890                ],
1891                sort: vec![col("value").sort(true, false)],
1892                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1893            },
1894            // same input but file '2' is in the middle
1895            // test that we still order correctly
1896            TestCase {
1897                name: "test sort with files ordered differently",
1898                file_schema: Schema::new(vec![Field::new(
1899                    "value".to_string(),
1900                    DataType::Float64,
1901                    false,
1902                )]),
1903                files: vec![
1904                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1905                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1906                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1907                ],
1908                sort: vec![col("value").sort(true, false)],
1909                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1910            },
1911            TestCase {
1912                name: "reverse sort",
1913                file_schema: Schema::new(vec![Field::new(
1914                    "value".to_string(),
1915                    DataType::Float64,
1916                    false,
1917                )]),
1918                files: vec![
1919                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1920                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1921                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1922                ],
1923                sort: vec![col("value").sort(false, true)],
1924                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1925            },
1926            TestCase {
1927                name: "nullable sort columns, nulls last",
1928                file_schema: Schema::new(vec![Field::new(
1929                    "value".to_string(),
1930                    DataType::Float64,
1931                    true,
1932                )]),
1933                files: vec![
1934                    File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
1935                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1936                    File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1937                ],
1938                sort: vec![col("value").sort(true, false)],
1939                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1940            },
1941            TestCase {
1942                name: "nullable sort columns, nulls first",
1943                file_schema: Schema::new(vec![Field::new(
1944                    "value".to_string(),
1945                    DataType::Float64,
1946                    true,
1947                )]),
1948                files: vec![
1949                    File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1950                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
1951                    File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1952                ],
1953                sort: vec![col("value").sort(true, true)],
1954                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1955            },
1956            TestCase {
1957                name: "all three non-overlapping",
1958                file_schema: Schema::new(vec![Field::new(
1959                    "value".to_string(),
1960                    DataType::Float64,
1961                    false,
1962                )]),
1963                files: vec![
1964                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1965                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1966                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1967                ],
1968                sort: vec![col("value").sort(true, false)],
1969                expected_result: Ok(vec![vec!["0", "1", "2"]]),
1970            },
1971            TestCase {
1972                name: "all three overlapping",
1973                file_schema: Schema::new(vec![Field::new(
1974                    "value".to_string(),
1975                    DataType::Float64,
1976                    false,
1977                )]),
1978                files: vec![
1979                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1980                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1981                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1982                ],
1983                sort: vec![col("value").sort(true, false)],
1984                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1985            },
1986            TestCase {
1987                name: "empty input",
1988                file_schema: Schema::new(vec![Field::new(
1989                    "value".to_string(),
1990                    DataType::Float64,
1991                    false,
1992                )]),
1993                files: vec![],
1994                sort: vec![col("value").sort(true, false)],
1995                expected_result: Ok(vec![]),
1996            },
1997            TestCase {
1998                name: "one file missing statistics",
1999                file_schema: Schema::new(vec![Field::new(
2000                    "value".to_string(),
2001                    DataType::Float64,
2002                    false,
2003                )]),
2004                files: vec![
2005                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2006                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2007                    File::new("2", "2023-01-02", vec![None]),
2008                ],
2009                sort: vec![col("value").sort(true, false)],
2010                expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
2011            },
2012        ];
2013
2014        for case in cases {
2015            let table_schema = Arc::new(Schema::new(
2016                case.file_schema
2017                    .fields()
2018                    .clone()
2019                    .into_iter()
2020                    .cloned()
2021                    .chain(Some(Arc::new(Field::new(
2022                        "date".to_string(),
2023                        DataType::Utf8,
2024                        false,
2025                    ))))
2026                    .collect::<Vec<_>>(),
2027            ));
2028            let Some(sort_order) = LexOrdering::new(
2029                case.sort
2030                    .into_iter()
2031                    .map(|expr| {
2032                        create_physical_sort_expr(
2033                            &expr,
2034                            &DFSchema::try_from(Arc::clone(&table_schema))?,
2035                            &ExecutionProps::default(),
2036                        )
2037                    })
2038                    .collect::<Result<Vec<_>>>()?,
2039            ) else {
2040                return internal_err!("This test should always use an ordering");
2041            };
2042
2043            let partitioned_files = FileGroup::new(
2044                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2045            );
2046            let result = FileScanConfig::split_groups_by_statistics(
2047                &table_schema,
2048                std::slice::from_ref(&partitioned_files),
2049                &sort_order,
2050            );
2051            let results_by_name = result
2052                .as_ref()
2053                .map(|file_groups| {
2054                    file_groups
2055                        .iter()
2056                        .map(|file_group| {
2057                            file_group
2058                                .iter()
2059                                .map(|file| {
2060                                    partitioned_files
2061                                        .iter()
2062                                        .find_map(|f| {
2063                                            if f.object_meta == file.object_meta {
2064                                                Some(
2065                                                    f.object_meta
2066                                                        .location
2067                                                        .as_ref()
2068                                                        .rsplit('/')
2069                                                        .next()
2070                                                        .unwrap()
2071                                                        .trim_end_matches(".parquet"),
2072                                                )
2073                                            } else {
2074                                                None
2075                                            }
2076                                        })
2077                                        .unwrap()
2078                                })
2079                                .collect::<Vec<_>>()
2080                        })
2081                        .collect::<Vec<_>>()
2082                })
2083                .map_err(|e| e.strip_backtrace().leak() as &'static str);
2084
2085            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2086        }
2087
2088        return Ok(());
2089
2090        impl From<File> for PartitionedFile {
2091            fn from(file: File) -> Self {
2092                PartitionedFile {
2093                    object_meta: ObjectMeta {
2094                        location: Path::from(format!(
2095                            "data/date={}/{}.parquet",
2096                            file.date, file.name
2097                        )),
2098                        last_modified: chrono::Utc.timestamp_nanos(0),
2099                        size: 0,
2100                        e_tag: None,
2101                        version: None,
2102                    },
2103                    partition_values: vec![ScalarValue::from(file.date)],
2104                    range: None,
2105                    statistics: Some(Arc::new(Statistics {
2106                        num_rows: Precision::Absent,
2107                        total_byte_size: Precision::Absent,
2108                        column_statistics: file
2109                            .statistics
2110                            .into_iter()
2111                            .map(|stats| {
2112                                stats
2113                                    .map(|(min, max)| ColumnStatistics {
2114                                        min_value: Precision::Exact(
2115                                            ScalarValue::Float64(min),
2116                                        ),
2117                                        max_value: Precision::Exact(
2118                                            ScalarValue::Float64(max),
2119                                        ),
2120                                        ..Default::default()
2121                                    })
2122                                    .unwrap_or_default()
2123                            })
2124                            .collect::<Vec<_>>(),
2125                    })),
2126                    extensions: None,
2127                    metadata_size_hint: None,
2128                }
2129            }
2130        }
2131    }
2132
2133    // sets default for configs that play no role in projections
2134    fn config_for_projection(
2135        file_schema: SchemaRef,
2136        projection: Option<Vec<usize>>,
2137        statistics: Statistics,
2138        table_partition_cols: Vec<Field>,
2139    ) -> FileScanConfig {
2140        FileScanConfigBuilder::new(
2141            ObjectStoreUrl::parse("test:///").unwrap(),
2142            file_schema,
2143            Arc::new(MockSource::default()),
2144        )
2145        .with_projection(projection)
2146        .with_statistics(statistics)
2147        .with_table_partition_cols(table_partition_cols)
2148        .build()
2149    }
2150
2151    /// Convert partition columns from Vec<String DataType> to Vec<Field>
2152    fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2153        table_partition_cols
2154            .iter()
2155            .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2156            .collect::<Vec<_>>()
2157    }
2158
2159    /// returns record batch with 3 columns of i32 in memory
2160    pub fn build_table_i32(
2161        a: (&str, &Vec<i32>),
2162        b: (&str, &Vec<i32>),
2163        c: (&str, &Vec<i32>),
2164    ) -> RecordBatch {
2165        let schema = Schema::new(vec![
2166            Field::new(a.0, DataType::Int32, false),
2167            Field::new(b.0, DataType::Int32, false),
2168            Field::new(c.0, DataType::Int32, false),
2169        ]);
2170
2171        RecordBatch::try_new(
2172            Arc::new(schema),
2173            vec![
2174                Arc::new(Int32Array::from(a.1.clone())),
2175                Arc::new(Int32Array::from(b.1.clone())),
2176                Arc::new(Int32Array::from(c.1.clone())),
2177            ],
2178        )
2179        .unwrap()
2180    }
2181
2182    #[test]
2183    fn test_file_scan_config_builder() {
2184        let file_schema = aggr_test_schema();
2185        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2186        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2187
2188        // Create a builder with required parameters
2189        let builder = FileScanConfigBuilder::new(
2190            object_store_url.clone(),
2191            Arc::clone(&file_schema),
2192            Arc::clone(&file_source),
2193        );
2194
2195        // Build with various configurations
2196        let config = builder
2197            .with_limit(Some(1000))
2198            .with_projection(Some(vec![0, 1]))
2199            .with_table_partition_cols(vec![Field::new(
2200                "date",
2201                wrap_partition_type_in_dict(DataType::Utf8),
2202                false,
2203            )])
2204            .with_statistics(Statistics::new_unknown(&file_schema))
2205            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2206                "test.parquet".to_string(),
2207                1024,
2208            )])])
2209            .with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
2210                Column::new("date", 0),
2211            ))]
2212            .into()])
2213            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2214            .with_newlines_in_values(true)
2215            .build();
2216
2217        // Verify the built config has all the expected values
2218        assert_eq!(config.object_store_url, object_store_url);
2219        assert_eq!(config.file_schema, file_schema);
2220        assert_eq!(config.limit, Some(1000));
2221        assert_eq!(config.projection, Some(vec![0, 1]));
2222        assert_eq!(config.table_partition_cols.len(), 1);
2223        assert_eq!(config.table_partition_cols[0].name(), "date");
2224        assert_eq!(config.file_groups.len(), 1);
2225        assert_eq!(config.file_groups[0].len(), 1);
2226        assert_eq!(
2227            config.file_groups[0][0].object_meta.location.as_ref(),
2228            "test.parquet"
2229        );
2230        assert_eq!(
2231            config.file_compression_type,
2232            FileCompressionType::UNCOMPRESSED
2233        );
2234        assert!(config.new_lines_in_values);
2235        assert_eq!(config.output_ordering.len(), 1);
2236    }
2237
2238    #[test]
2239    fn equivalence_properties_after_schema_change() {
2240        let file_schema = aggr_test_schema();
2241        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2242        // Create a file source with a filter
2243        let file_source: Arc<dyn FileSource> =
2244            Arc::new(MockSource::default().with_filter(Arc::new(BinaryExpr::new(
2245                col("c2", &file_schema).unwrap(),
2246                Operator::Eq,
2247                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2248            ))));
2249
2250        let config = FileScanConfigBuilder::new(
2251            object_store_url.clone(),
2252            Arc::clone(&file_schema),
2253            Arc::clone(&file_source),
2254        )
2255        .with_projection(Some(vec![0, 1, 2]))
2256        .build();
2257
2258        // Simulate projection being updated. Since the filter has already been pushed down,
2259        // the new projection won't include the filtered column.
2260        let data_source = config
2261            .try_swapping_with_projection(&[ProjectionExpr::new(
2262                col("c3", &file_schema).unwrap(),
2263                "c3".to_string(),
2264            )])
2265            .unwrap()
2266            .unwrap();
2267
2268        // Gather the equivalence properties from the new data source. There should
2269        // be no equivalence class for column c2 since it was removed by the projection.
2270        let eq_properties = data_source.eq_properties();
2271        let eq_group = eq_properties.eq_group();
2272
2273        for class in eq_group.iter() {
2274            for expr in class.iter() {
2275                if let Some(col) = expr.as_any().downcast_ref::<Column>() {
2276                    assert_ne!(
2277                        col.name(),
2278                        "c2",
2279                        "c2 should not be present in any equivalence class"
2280                    );
2281                }
2282            }
2283        }
2284    }
2285
2286    #[test]
2287    fn test_file_scan_config_builder_defaults() {
2288        let file_schema = aggr_test_schema();
2289        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2290        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2291
2292        // Create a builder with only required parameters and build without any additional configurations
2293        let config = FileScanConfigBuilder::new(
2294            object_store_url.clone(),
2295            Arc::clone(&file_schema),
2296            Arc::clone(&file_source),
2297        )
2298        .build();
2299
2300        // Verify default values
2301        assert_eq!(config.object_store_url, object_store_url);
2302        assert_eq!(config.file_schema, file_schema);
2303        assert_eq!(config.limit, None);
2304        assert_eq!(config.projection, None);
2305        assert!(config.table_partition_cols.is_empty());
2306        assert!(config.file_groups.is_empty());
2307        assert_eq!(
2308            config.file_compression_type,
2309            FileCompressionType::UNCOMPRESSED
2310        );
2311        assert!(!config.new_lines_in_values);
2312        assert!(config.output_ordering.is_empty());
2313        assert!(config.constraints.is_empty());
2314
2315        // Verify statistics are set to unknown
2316        assert_eq!(
2317            config.file_source.statistics().unwrap().num_rows,
2318            Precision::Absent
2319        );
2320        assert_eq!(
2321            config.file_source.statistics().unwrap().total_byte_size,
2322            Precision::Absent
2323        );
2324        assert_eq!(
2325            config
2326                .file_source
2327                .statistics()
2328                .unwrap()
2329                .column_statistics
2330                .len(),
2331            file_schema.fields().len()
2332        );
2333        for stat in config.file_source.statistics().unwrap().column_statistics {
2334            assert_eq!(stat.distinct_count, Precision::Absent);
2335            assert_eq!(stat.min_value, Precision::Absent);
2336            assert_eq!(stat.max_value, Precision::Absent);
2337            assert_eq!(stat.null_count, Precision::Absent);
2338        }
2339    }
2340
2341    #[test]
2342    fn test_file_scan_config_builder_new_from() {
2343        let schema = aggr_test_schema();
2344        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2345        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2346        let partition_cols = vec![Field::new(
2347            "date",
2348            wrap_partition_type_in_dict(DataType::Utf8),
2349            false,
2350        )];
2351        let file = PartitionedFile::new("test_file.parquet", 100);
2352
2353        // Create a config with non-default values
2354        let original_config = FileScanConfigBuilder::new(
2355            object_store_url.clone(),
2356            Arc::clone(&schema),
2357            Arc::clone(&file_source),
2358        )
2359        .with_projection(Some(vec![0, 2]))
2360        .with_limit(Some(10))
2361        .with_table_partition_cols(partition_cols.clone())
2362        .with_file(file.clone())
2363        .with_constraints(Constraints::default())
2364        .with_newlines_in_values(true)
2365        .build();
2366
2367        // Create a new builder from the config
2368        let new_builder = FileScanConfigBuilder::from(original_config);
2369
2370        // Build a new config from this builder
2371        let new_config = new_builder.build();
2372
2373        // Verify properties match
2374        let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2375        assert_eq!(new_config.object_store_url, object_store_url);
2376        assert_eq!(new_config.file_schema, schema);
2377        assert_eq!(new_config.projection, Some(vec![0, 2]));
2378        assert_eq!(new_config.limit, Some(10));
2379        assert_eq!(new_config.table_partition_cols, partition_cols);
2380        assert_eq!(new_config.file_groups.len(), 1);
2381        assert_eq!(new_config.file_groups[0].len(), 1);
2382        assert_eq!(
2383            new_config.file_groups[0][0].object_meta.location.as_ref(),
2384            "test_file.parquet"
2385        );
2386        assert_eq!(new_config.constraints, Constraints::default());
2387        assert!(new_config.new_lines_in_values);
2388    }
2389
2390    #[test]
2391    fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2392        use datafusion_common::DFSchema;
2393        use datafusion_expr::{col, execution_props::ExecutionProps};
2394
2395        let schema = Arc::new(Schema::new(vec![Field::new(
2396            "value",
2397            DataType::Float64,
2398            false,
2399        )]));
2400
2401        // Setup sort expression
2402        let exec_props = ExecutionProps::new();
2403        let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2404        let sort_expr = [col("value").sort(true, false)];
2405        let sort_ordering = sort_expr
2406            .map(|expr| {
2407                create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2408            })
2409            .into();
2410
2411        // Test case parameters
2412        struct TestCase {
2413            name: String,
2414            file_count: usize,
2415            overlap_factor: f64,
2416            target_partitions: usize,
2417            expected_partition_count: usize,
2418        }
2419
2420        let test_cases = vec![
2421            // Basic cases
2422            TestCase {
2423                name: "no_overlap_10_files_4_partitions".to_string(),
2424                file_count: 10,
2425                overlap_factor: 0.0,
2426                target_partitions: 4,
2427                expected_partition_count: 4,
2428            },
2429            TestCase {
2430                name: "medium_overlap_20_files_5_partitions".to_string(),
2431                file_count: 20,
2432                overlap_factor: 0.5,
2433                target_partitions: 5,
2434                expected_partition_count: 5,
2435            },
2436            TestCase {
2437                name: "high_overlap_30_files_3_partitions".to_string(),
2438                file_count: 30,
2439                overlap_factor: 0.8,
2440                target_partitions: 3,
2441                expected_partition_count: 7,
2442            },
2443            // Edge cases
2444            TestCase {
2445                name: "fewer_files_than_partitions".to_string(),
2446                file_count: 3,
2447                overlap_factor: 0.0,
2448                target_partitions: 10,
2449                expected_partition_count: 3, // Should only create as many partitions as files
2450            },
2451            TestCase {
2452                name: "single_file".to_string(),
2453                file_count: 1,
2454                overlap_factor: 0.0,
2455                target_partitions: 5,
2456                expected_partition_count: 1, // Should create only one partition
2457            },
2458            TestCase {
2459                name: "empty_files".to_string(),
2460                file_count: 0,
2461                overlap_factor: 0.0,
2462                target_partitions: 3,
2463                expected_partition_count: 0, // Empty result for empty input
2464            },
2465        ];
2466
2467        for case in test_cases {
2468            println!("Running test case: {}", case.name);
2469
2470            // Generate files using bench utility function
2471            let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2472
2473            // Call the function under test
2474            let result =
2475                FileScanConfig::split_groups_by_statistics_with_target_partitions(
2476                    &schema,
2477                    &file_groups,
2478                    &sort_ordering,
2479                    case.target_partitions,
2480                )?;
2481
2482            // Verify results
2483            println!(
2484                "Created {} partitions (target was {})",
2485                result.len(),
2486                case.target_partitions
2487            );
2488
2489            // Check partition count
2490            assert_eq!(
2491                result.len(),
2492                case.expected_partition_count,
2493                "Case '{}': Unexpected partition count",
2494                case.name
2495            );
2496
2497            // Verify sort integrity
2498            assert!(
2499                verify_sort_integrity(&result),
2500                "Case '{}': Files within partitions are not properly ordered",
2501                case.name
2502            );
2503
2504            // Distribution check for partitions
2505            if case.file_count > 1 && case.expected_partition_count > 1 {
2506                let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2507                let max_size = *group_sizes.iter().max().unwrap();
2508                let min_size = *group_sizes.iter().min().unwrap();
2509
2510                // Check partition balancing - difference shouldn't be extreme
2511                let avg_files_per_partition =
2512                    case.file_count as f64 / case.expected_partition_count as f64;
2513                assert!(
2514                    (max_size as f64) < 2.0 * avg_files_per_partition,
2515                    "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2516                    case.name,
2517                    max_size,
2518                    avg_files_per_partition
2519                );
2520
2521                println!("Distribution - min files: {min_size}, max files: {max_size}");
2522            }
2523        }
2524
2525        // Test error case: zero target partitions
2526        let empty_groups: Vec<FileGroup> = vec![];
2527        let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2528            &schema,
2529            &empty_groups,
2530            &sort_ordering,
2531            0,
2532        )
2533        .unwrap_err();
2534
2535        assert!(
2536            err.to_string()
2537                .contains("target_partitions must be greater than 0"),
2538            "Expected error for zero target partitions"
2539        );
2540
2541        Ok(())
2542    }
2543}