datafusion_datasource/
file_scan_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21use std::{
22    any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23    fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use crate::file_groups::FileGroup;
27#[allow(unused_imports)]
28use crate::schema_adapter::SchemaAdapterFactory;
29use crate::{
30    display::FileGroupsDisplay, file::FileSource,
31    file_compression_type::FileCompressionType, file_stream::FileStream,
32    source::DataSource, statistics::MinMaxStatistics, PartitionedFile,
33};
34use arrow::datatypes::FieldRef;
35use arrow::{
36    array::{
37        ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
38        RecordBatchOptions,
39    },
40    buffer::Buffer,
41    datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
42};
43use datafusion_common::config::ConfigOptions;
44use datafusion_common::{
45    exec_err, ColumnStatistics, Constraints, DataFusionError, Result, ScalarValue,
46    Statistics,
47};
48use datafusion_execution::{
49    object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
50};
51use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_columns};
52use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
53use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
54use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
56use datafusion_physical_plan::projection::ProjectionExpr;
57use datafusion_physical_plan::{
58    display::{display_orderings, ProjectSchemaDisplay},
59    metrics::ExecutionPlanMetricsSet,
60    projection::{all_alias_free_columns, new_projections_for_columns},
61    DisplayAs, DisplayFormatType,
62};
63use datafusion_physical_plan::{
64    filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
65};
66
67use datafusion_physical_plan::coop::cooperative;
68use datafusion_physical_plan::execution_plan::SchedulingType;
69use log::{debug, warn};
70
71/// The base configurations for a [`DataSourceExec`], the a physical plan for
72/// any given file format.
73///
74/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
75///
76/// # Example
77/// ```
78/// # use std::any::Any;
79/// # use std::sync::Arc;
80/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
81/// # use object_store::ObjectStore;
82/// # use datafusion_common::Statistics;
83/// # use datafusion_common::Result;
84/// # use datafusion_datasource::file::FileSource;
85/// # use datafusion_datasource::file_groups::FileGroup;
86/// # use datafusion_datasource::PartitionedFile;
87/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
88/// # use datafusion_datasource::file_stream::FileOpener;
89/// # use datafusion_datasource::source::DataSourceExec;
90/// # use datafusion_execution::object_store::ObjectStoreUrl;
91/// # use datafusion_physical_plan::ExecutionPlan;
92/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
93/// # use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
94/// # let file_schema = Arc::new(Schema::new(vec![
95/// #  Field::new("c1", DataType::Int32, false),
96/// #  Field::new("c2", DataType::Int32, false),
97/// #  Field::new("c3", DataType::Int32, false),
98/// #  Field::new("c4", DataType::Int32, false),
99/// # ]));
100/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
101/// #[derive(Clone)]
102/// # struct ParquetSource {
103/// #    projected_statistics: Option<Statistics>,
104/// #    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>
105/// # };
106/// # impl FileSource for ParquetSource {
107/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
108/// #  fn as_any(&self) -> &dyn Any { self  }
109/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
110/// #  fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { Arc::new(self.clone()) as Arc<dyn FileSource> }
111/// #  fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
112/// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) }
113/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
114/// #  fn statistics(&self) -> Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) }
115/// #  fn file_type(&self) -> &str { "parquet" }
116/// #  fn with_schema_adapter_factory(&self, factory: Arc<dyn SchemaAdapterFactory>) -> Result<Arc<dyn FileSource>> { Ok(Arc::new(Self {projected_statistics: self.projected_statistics.clone(), schema_adapter_factory: Some(factory)} )) }
117/// #  fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { self.schema_adapter_factory.clone() }
118/// #  }
119/// # impl ParquetSource {
120/// #  fn new() -> Self { Self {projected_statistics: None, schema_adapter_factory: None} }
121/// # }
122/// // create FileScan config for reading parquet files from file://
123/// let object_store_url = ObjectStoreUrl::local_filesystem();
124/// let file_source = Arc::new(ParquetSource::new());
125/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
126///   .with_limit(Some(1000))            // read only the first 1000 records
127///   .with_projection(Some(vec![2, 3])) // project columns 2 and 3
128///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
129///   .with_file(PartitionedFile::new("file1.parquet", 1234))
130///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
131///   // in a  single row group
132///   .with_file_group(FileGroup::new(vec![
133///    PartitionedFile::new("file2.parquet", 56),
134///    PartitionedFile::new("file3.parquet", 78),
135///   ])).build();
136/// // create an execution plan from the config
137/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
138/// ```
139///
140/// [`DataSourceExec`]: crate::source::DataSourceExec
141/// [`DataSourceExec::from_data_source`]: crate::source::DataSourceExec::from_data_source
142#[derive(Clone)]
143pub struct FileScanConfig {
144    /// Object store URL, used to get an [`ObjectStore`] instance from
145    /// [`RuntimeEnv::object_store`]
146    ///
147    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
148    /// as `file://` or `s3://my_bucket`. It should not include the path to the
149    /// file itself. The relevant URL prefix must be registered via
150    /// [`RuntimeEnv::register_object_store`]
151    ///
152    /// [`ObjectStore`]: object_store::ObjectStore
153    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
154    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
155    pub object_store_url: ObjectStoreUrl,
156    /// Schema before `projection` is applied. It contains the all columns that may
157    /// appear in the files. It does not include table partition columns
158    /// that may be added.
159    /// Note that this is **not** the schema of the physical files.
160    /// This is the schema that the physical file schema will be
161    /// mapped onto, and the schema that the [`DataSourceExec`] will return.
162    ///
163    /// [`DataSourceExec`]: crate::source::DataSourceExec
164    pub file_schema: SchemaRef,
165    /// List of files to be processed, grouped into partitions
166    ///
167    /// Each file must have a schema of `file_schema` or a subset. If
168    /// a particular file has a subset, the missing columns are
169    /// padded with NULLs.
170    ///
171    /// DataFusion may attempt to read each partition of files
172    /// concurrently, however files *within* a partition will be read
173    /// sequentially, one after the next.
174    pub file_groups: Vec<FileGroup>,
175    /// Table constraints
176    pub constraints: Constraints,
177    /// Columns on which to project the data. Indexes that are higher than the
178    /// number of columns of `file_schema` refer to `table_partition_cols`.
179    pub projection: Option<Vec<usize>>,
180    /// The maximum number of records to read from this plan. If `None`,
181    /// all records after filtering are returned.
182    pub limit: Option<usize>,
183    /// The partitioning columns
184    pub table_partition_cols: Vec<FieldRef>,
185    /// All equivalent lexicographical orderings that describe the schema.
186    pub output_ordering: Vec<LexOrdering>,
187    /// File compression type
188    pub file_compression_type: FileCompressionType,
189    /// Are new lines in values supported for CSVOptions
190    pub new_lines_in_values: bool,
191    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
192    pub file_source: Arc<dyn FileSource>,
193    /// Batch size while creating new batches
194    /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
195    pub batch_size: Option<usize>,
196    /// Expression adapter used to adapt filters and projections that are pushed down into the scan
197    /// from the logical schema to the physical schema of the file.
198    pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
199}
200
201/// A builder for [`FileScanConfig`]'s.
202///
203/// Example:
204///
205/// ```rust
206/// # use std::sync::Arc;
207/// # use arrow::datatypes::{DataType, Field, Schema};
208/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig};
209/// # use datafusion_datasource::file_compression_type::FileCompressionType;
210/// # use datafusion_datasource::file_groups::FileGroup;
211/// # use datafusion_datasource::PartitionedFile;
212/// # use datafusion_execution::object_store::ObjectStoreUrl;
213/// # use datafusion_common::Statistics;
214/// # use datafusion_datasource::file::FileSource;
215///
216/// # fn main() {
217/// # fn with_source(file_source: Arc<dyn FileSource>) {
218///     // Create a schema for our Parquet files
219///     let schema = Arc::new(Schema::new(vec![
220///         Field::new("id", DataType::Int32, false),
221///         Field::new("value", DataType::Utf8, false),
222///     ]));
223///
224///     // Create a builder for scanning Parquet files from a local filesystem
225///     let config = FileScanConfigBuilder::new(
226///         ObjectStoreUrl::local_filesystem(),
227///         schema,
228///         file_source,
229///     )
230///     // Set a limit of 1000 rows
231///     .with_limit(Some(1000))
232///     // Project only the first column
233///     .with_projection(Some(vec![0]))
234///     // Add partition columns
235///     .with_table_partition_cols(vec![
236///         Field::new("date", DataType::Utf8, false),
237///     ])
238///     // Add a file group with two files
239///     .with_file_group(FileGroup::new(vec![
240///         PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024),
241///         PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048),
242///     ]))
243///     // Set compression type
244///     .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
245///     // Build the final config
246///     .build();
247/// # }
248/// # }
249/// ```
250#[derive(Clone)]
251pub struct FileScanConfigBuilder {
252    object_store_url: ObjectStoreUrl,
253    /// Table schema before any projections or partition columns are applied.
254    ///
255    /// This schema is used to read the files, but is **not** necessarily the
256    /// schema of the physical files. Rather this is the schema that the
257    /// physical file schema will be mapped onto, and the schema that the
258    /// [`DataSourceExec`] will return.
259    ///
260    /// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns.
261    ///
262    /// This probably would be better named `table_schema`
263    ///
264    /// [`DataSourceExec`]: crate::source::DataSourceExec
265    file_schema: SchemaRef,
266    file_source: Arc<dyn FileSource>,
267    limit: Option<usize>,
268    projection: Option<Vec<usize>>,
269    table_partition_cols: Vec<FieldRef>,
270    constraints: Option<Constraints>,
271    file_groups: Vec<FileGroup>,
272    statistics: Option<Statistics>,
273    output_ordering: Vec<LexOrdering>,
274    file_compression_type: Option<FileCompressionType>,
275    new_lines_in_values: Option<bool>,
276    batch_size: Option<usize>,
277    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
278}
279
280impl FileScanConfigBuilder {
281    /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files.
282    ///
283    /// # Parameters:
284    /// * `object_store_url`: See [`FileScanConfig::object_store_url`]
285    /// * `file_schema`: See [`FileScanConfig::file_schema`]
286    /// * `file_source`: See [`FileScanConfig::file_source`]
287    pub fn new(
288        object_store_url: ObjectStoreUrl,
289        file_schema: SchemaRef,
290        file_source: Arc<dyn FileSource>,
291    ) -> Self {
292        Self {
293            object_store_url,
294            file_schema,
295            file_source,
296            file_groups: vec![],
297            statistics: None,
298            output_ordering: vec![],
299            file_compression_type: None,
300            new_lines_in_values: None,
301            limit: None,
302            projection: None,
303            table_partition_cols: vec![],
304            constraints: None,
305            batch_size: None,
306            expr_adapter_factory: None,
307        }
308    }
309
310    /// Set the maximum number of records to read from this plan. If `None`,
311    /// all records after filtering are returned.
312    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
313        self.limit = limit;
314        self
315    }
316
317    /// Set the file source for scanning files.
318    ///
319    /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
320    /// after the builder has been created.
321    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
322        self.file_source = file_source;
323        self
324    }
325
326    /// Set the columns on which to project the data. Indexes that are higher than the
327    /// number of columns of `file_schema` refer to `table_partition_cols`.
328    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
329        self.projection = projection;
330        self
331    }
332
333    /// Set the partitioning columns
334    pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
335        self.table_partition_cols = table_partition_cols
336            .into_iter()
337            .map(|f| Arc::new(f) as FieldRef)
338            .collect();
339        self
340    }
341
342    /// Set the table constraints
343    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
344        self.constraints = Some(constraints);
345        self
346    }
347
348    /// Set the estimated overall statistics of the files, taking `filters` into account.
349    /// Defaults to [`Statistics::new_unknown`].
350    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
351        self.statistics = Some(statistics);
352        self
353    }
354
355    /// Set the list of files to be processed, grouped into partitions.
356    ///
357    /// Each file must have a schema of `file_schema` or a subset. If
358    /// a particular file has a subset, the missing columns are
359    /// padded with NULLs.
360    ///
361    /// DataFusion may attempt to read each partition of files
362    /// concurrently, however files *within* a partition will be read
363    /// sequentially, one after the next.
364    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
365        self.file_groups = file_groups;
366        self
367    }
368
369    /// Add a new file group
370    ///
371    /// See [`Self::with_file_groups`] for more information
372    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
373        self.file_groups.push(file_group);
374        self
375    }
376
377    /// Add a file as a single group
378    ///
379    /// See [`Self::with_file_groups`] for more information.
380    pub fn with_file(self, file: PartitionedFile) -> Self {
381        self.with_file_group(FileGroup::new(vec![file]))
382    }
383
384    /// Set the output ordering of the files
385    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
386        self.output_ordering = output_ordering;
387        self
388    }
389
390    /// Set the file compression type
391    pub fn with_file_compression_type(
392        mut self,
393        file_compression_type: FileCompressionType,
394    ) -> Self {
395        self.file_compression_type = Some(file_compression_type);
396        self
397    }
398
399    /// Set whether new lines in values are supported for CSVOptions
400    ///
401    /// Parsing newlines in quoted values may be affected by execution behaviour such as
402    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
403    /// parsed successfully, which may reduce performance.
404    pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
405        self.new_lines_in_values = Some(new_lines_in_values);
406        self
407    }
408
409    /// Set the batch_size property
410    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
411        self.batch_size = batch_size;
412        self
413    }
414
415    /// Register an expression adapter used to adapt filters and projections that are pushed down into the scan
416    /// from the logical schema to the physical schema of the file.
417    /// This can include things like:
418    /// - Column ordering changes
419    /// - Handling of missing columns
420    /// - Rewriting expression to use pre-computed values or file format specific optimizations
421    pub fn with_expr_adapter(
422        mut self,
423        expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
424    ) -> Self {
425        self.expr_adapter_factory = expr_adapter;
426        self
427    }
428
429    /// Build the final [`FileScanConfig`] with all the configured settings.
430    ///
431    /// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
432    /// Any unset optional fields will use their default values.
433    pub fn build(self) -> FileScanConfig {
434        let Self {
435            object_store_url,
436            file_schema,
437            file_source,
438            limit,
439            projection,
440            table_partition_cols,
441            constraints,
442            file_groups,
443            statistics,
444            output_ordering,
445            file_compression_type,
446            new_lines_in_values,
447            batch_size,
448            expr_adapter_factory: expr_adapter,
449        } = self;
450
451        let constraints = constraints.unwrap_or_default();
452        let statistics =
453            statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
454
455        let file_source = file_source
456            .with_statistics(statistics.clone())
457            .with_schema(Arc::clone(&file_schema));
458        let file_compression_type =
459            file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
460        let new_lines_in_values = new_lines_in_values.unwrap_or(false);
461
462        FileScanConfig {
463            object_store_url,
464            file_schema,
465            file_source,
466            limit,
467            projection,
468            table_partition_cols,
469            constraints,
470            file_groups,
471            output_ordering,
472            file_compression_type,
473            new_lines_in_values,
474            batch_size,
475            expr_adapter_factory: expr_adapter,
476        }
477    }
478}
479
480impl From<FileScanConfig> for FileScanConfigBuilder {
481    fn from(config: FileScanConfig) -> Self {
482        Self {
483            object_store_url: config.object_store_url,
484            file_schema: config.file_schema,
485            file_source: Arc::<dyn FileSource>::clone(&config.file_source),
486            file_groups: config.file_groups,
487            statistics: config.file_source.statistics().ok(),
488            output_ordering: config.output_ordering,
489            file_compression_type: Some(config.file_compression_type),
490            new_lines_in_values: Some(config.new_lines_in_values),
491            limit: config.limit,
492            projection: config.projection,
493            table_partition_cols: config.table_partition_cols,
494            constraints: Some(config.constraints),
495            batch_size: config.batch_size,
496            expr_adapter_factory: config.expr_adapter_factory,
497        }
498    }
499}
500
501impl DataSource for FileScanConfig {
502    fn open(
503        &self,
504        partition: usize,
505        context: Arc<TaskContext>,
506    ) -> Result<SendableRecordBatchStream> {
507        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
508        let batch_size = self
509            .batch_size
510            .unwrap_or_else(|| context.session_config().batch_size());
511
512        let source = self
513            .file_source
514            .with_batch_size(batch_size)
515            .with_projection(self);
516
517        let opener = source.create_file_opener(object_store, self, partition);
518
519        let stream = FileStream::new(self, partition, opener, source.metrics())?;
520        Ok(Box::pin(cooperative(stream)))
521    }
522
523    fn as_any(&self) -> &dyn Any {
524        self
525    }
526
527    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
528        match t {
529            DisplayFormatType::Default | DisplayFormatType::Verbose => {
530                let schema = self.projected_schema();
531                let orderings = get_projected_output_ordering(self, &schema);
532
533                write!(f, "file_groups=")?;
534                FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
535
536                if !schema.fields().is_empty() {
537                    write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
538                }
539
540                if let Some(limit) = self.limit {
541                    write!(f, ", limit={limit}")?;
542                }
543
544                display_orderings(f, &orderings)?;
545
546                if !self.constraints.is_empty() {
547                    write!(f, ", {}", self.constraints)?;
548                }
549
550                self.fmt_file_source(t, f)
551            }
552            DisplayFormatType::TreeRender => {
553                writeln!(f, "format={}", self.file_source.file_type())?;
554                self.file_source.fmt_extra(t, f)?;
555                let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
556                writeln!(f, "files={num_files}")?;
557                Ok(())
558            }
559        }
560    }
561
562    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
563    fn repartitioned(
564        &self,
565        target_partitions: usize,
566        repartition_file_min_size: usize,
567        output_ordering: Option<LexOrdering>,
568    ) -> Result<Option<Arc<dyn DataSource>>> {
569        let source = self.file_source.repartitioned(
570            target_partitions,
571            repartition_file_min_size,
572            output_ordering,
573            self,
574        )?;
575
576        Ok(source.map(|s| Arc::new(s) as _))
577    }
578
579    fn output_partitioning(&self) -> Partitioning {
580        Partitioning::UnknownPartitioning(self.file_groups.len())
581    }
582
583    fn eq_properties(&self) -> EquivalenceProperties {
584        let (schema, constraints, _, orderings) = self.project();
585        let mut eq_properties =
586            EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings)
587                .with_constraints(constraints);
588        if let Some(filter) = self.file_source.filter() {
589            // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
590            // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
591            match reassign_predicate_columns(filter, &schema, true) {
592                Ok(filter) => {
593                    match Self::add_filter_equivalence_info(filter, &mut eq_properties) {
594                        Ok(()) => {}
595                        Err(e) => {
596                            warn!("Failed to add filter equivalence info: {e}");
597                            #[cfg(debug_assertions)]
598                            panic!("Failed to add filter equivalence info: {e}");
599                        }
600                    }
601                }
602                Err(e) => {
603                    warn!("Failed to reassign predicate columns: {e}");
604                    #[cfg(debug_assertions)]
605                    panic!("Failed to reassign predicate columns: {e}");
606                }
607            };
608        }
609        eq_properties
610    }
611
612    fn scheduling_type(&self) -> SchedulingType {
613        SchedulingType::Cooperative
614    }
615
616    fn statistics(&self) -> Result<Statistics> {
617        Ok(self.projected_stats())
618    }
619
620    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
621        let source = FileScanConfigBuilder::from(self.clone())
622            .with_limit(limit)
623            .build();
624        Some(Arc::new(source))
625    }
626
627    fn fetch(&self) -> Option<usize> {
628        self.limit
629    }
630
631    fn metrics(&self) -> ExecutionPlanMetricsSet {
632        self.file_source.metrics().clone()
633    }
634
635    fn try_swapping_with_projection(
636        &self,
637        projection: &[ProjectionExpr],
638    ) -> Result<Option<Arc<dyn DataSource>>> {
639        // This process can be moved into CsvExec, but it would be an overlap of their responsibility.
640
641        // Must be all column references, with no table partition columns (which can not be projected)
642        let partitioned_columns_in_proj = projection.iter().any(|proj_expr| {
643            proj_expr
644                .expr
645                .as_any()
646                .downcast_ref::<Column>()
647                .map(|expr| expr.index() >= self.file_schema.fields().len())
648                .unwrap_or(false)
649        });
650
651        // If there is any non-column or alias-carrier expression, Projection should not be removed.
652        let no_aliases = all_alias_free_columns(projection);
653
654        Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
655            let file_scan = self.clone();
656            let source = Arc::clone(&file_scan.file_source);
657            let new_projections = new_projections_for_columns(
658                projection,
659                &file_scan
660                    .projection
661                    .clone()
662                    .unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()),
663            );
664
665            Arc::new(
666                FileScanConfigBuilder::from(file_scan)
667                    // Assign projected statistics to source
668                    .with_projection(Some(new_projections))
669                    .with_source(source)
670                    .build(),
671            ) as _
672        }))
673    }
674
675    fn try_pushdown_filters(
676        &self,
677        filters: Vec<Arc<dyn PhysicalExpr>>,
678        config: &ConfigOptions,
679    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
680        let result = self.file_source.try_pushdown_filters(filters, config)?;
681        match result.updated_node {
682            Some(new_file_source) => {
683                let file_scan_config = FileScanConfigBuilder::from(self.clone())
684                    .with_source(new_file_source)
685                    .build();
686                Ok(FilterPushdownPropagation {
687                    filters: result.filters,
688                    updated_node: Some(Arc::new(file_scan_config) as _),
689                })
690            }
691            None => {
692                // If the file source does not support filter pushdown, return the original config
693                Ok(FilterPushdownPropagation {
694                    filters: result.filters,
695                    updated_node: None,
696                })
697            }
698        }
699    }
700}
701
702impl FileScanConfig {
703    fn projection_indices(&self) -> Vec<usize> {
704        match &self.projection {
705            Some(proj) => proj.clone(),
706            None => (0..self.file_schema.fields().len()
707                + self.table_partition_cols.len())
708                .collect(),
709        }
710    }
711
712    pub fn projected_stats(&self) -> Statistics {
713        let statistics = self.file_source.statistics().unwrap();
714
715        let table_cols_stats = self
716            .projection_indices()
717            .into_iter()
718            .map(|idx| {
719                if idx < self.file_schema.fields().len() {
720                    statistics.column_statistics[idx].clone()
721                } else {
722                    // TODO provide accurate stat for partition column (#1186)
723                    ColumnStatistics::new_unknown()
724                }
725            })
726            .collect();
727
728        Statistics {
729            num_rows: statistics.num_rows,
730            // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
731            total_byte_size: statistics.total_byte_size,
732            column_statistics: table_cols_stats,
733        }
734    }
735
736    pub fn projected_schema(&self) -> Arc<Schema> {
737        let table_fields: Vec<_> = self
738            .projection_indices()
739            .into_iter()
740            .map(|idx| {
741                if idx < self.file_schema.fields().len() {
742                    self.file_schema.field(idx).clone()
743                } else {
744                    let partition_idx = idx - self.file_schema.fields().len();
745                    Arc::unwrap_or_clone(Arc::clone(
746                        &self.table_partition_cols[partition_idx],
747                    ))
748                }
749            })
750            .collect();
751
752        Arc::new(Schema::new_with_metadata(
753            table_fields,
754            self.file_schema.metadata().clone(),
755        ))
756    }
757
758    fn add_filter_equivalence_info(
759        filter: Arc<dyn PhysicalExpr>,
760        eq_properties: &mut EquivalenceProperties,
761    ) -> Result<()> {
762        let (equal_pairs, _) = collect_columns_from_predicate(&filter);
763        for (lhs, rhs) in equal_pairs {
764            eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
765        }
766        Ok(())
767    }
768
769    pub fn projected_constraints(&self) -> Constraints {
770        let indexes = self.projection_indices();
771        self.constraints.project(&indexes).unwrap_or_default()
772    }
773
774    /// Specifies whether newlines in (quoted) values are supported.
775    ///
776    /// Parsing newlines in quoted values may be affected by execution behaviour such as
777    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
778    /// parsed successfully, which may reduce performance.
779    ///
780    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
781    pub fn newlines_in_values(&self) -> bool {
782        self.new_lines_in_values
783    }
784
785    /// Project the schema, constraints, and the statistics on the given column indices
786    pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
787        if self.projection.is_none() && self.table_partition_cols.is_empty() {
788            return (
789                Arc::clone(&self.file_schema),
790                self.constraints.clone(),
791                self.file_source.statistics().unwrap().clone(),
792                self.output_ordering.clone(),
793            );
794        }
795
796        let schema = self.projected_schema();
797        let constraints = self.projected_constraints();
798        let stats = self.projected_stats();
799
800        let output_ordering = get_projected_output_ordering(self, &schema);
801
802        (schema, constraints, stats, output_ordering)
803    }
804
805    pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
806        self.projection.as_ref().map(|p| {
807            p.iter()
808                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
809                .map(|col_idx| self.file_schema.field(*col_idx).name())
810                .cloned()
811                .collect()
812        })
813    }
814
815    /// Projects only file schema, ignoring partition columns
816    pub fn projected_file_schema(&self) -> SchemaRef {
817        let fields = self.file_column_projection_indices().map(|indices| {
818            indices
819                .iter()
820                .map(|col_idx| self.file_schema.field(*col_idx))
821                .cloned()
822                .collect::<Vec<_>>()
823        });
824
825        fields.map_or_else(
826            || Arc::clone(&self.file_schema),
827            |f| {
828                Arc::new(Schema::new_with_metadata(
829                    f,
830                    self.file_schema.metadata.clone(),
831                ))
832            },
833        )
834    }
835
836    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
837        self.projection.as_ref().map(|p| {
838            p.iter()
839                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
840                .copied()
841                .collect()
842        })
843    }
844
845    /// Splits file groups into new groups based on statistics to enable efficient parallel processing.
846    ///
847    /// The method distributes files across a target number of partitions while ensuring
848    /// files within each partition maintain sort order based on their min/max statistics.
849    ///
850    /// The algorithm works by:
851    /// 1. Takes files sorted by minimum values
852    /// 2. For each file:
853    ///   - Finds eligible groups (empty or where file's min > group's last max)
854    ///   - Selects the smallest eligible group
855    ///   - Creates a new group if needed
856    ///
857    /// # Parameters
858    /// * `table_schema`: Schema containing information about the columns
859    /// * `file_groups`: The original file groups to split
860    /// * `sort_order`: The lexicographical ordering to maintain within each group
861    /// * `target_partitions`: The desired number of output partitions
862    ///
863    /// # Returns
864    /// A new set of file groups, where files within each group are non-overlapping with respect to
865    /// their min/max statistics and maintain the specified sort order.
866    pub fn split_groups_by_statistics_with_target_partitions(
867        table_schema: &SchemaRef,
868        file_groups: &[FileGroup],
869        sort_order: &LexOrdering,
870        target_partitions: usize,
871    ) -> Result<Vec<FileGroup>> {
872        if target_partitions == 0 {
873            return Err(DataFusionError::Internal(
874                "target_partitions must be greater than 0".to_string(),
875            ));
876        }
877
878        let flattened_files = file_groups
879            .iter()
880            .flat_map(FileGroup::iter)
881            .collect::<Vec<_>>();
882
883        if flattened_files.is_empty() {
884            return Ok(vec![]);
885        }
886
887        let statistics = MinMaxStatistics::new_from_files(
888            sort_order,
889            table_schema,
890            None,
891            flattened_files.iter().copied(),
892        )?;
893
894        let indices_sorted_by_min = statistics.min_values_sorted();
895
896        // Initialize with target_partitions empty groups
897        let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
898
899        for (idx, min) in indices_sorted_by_min {
900            if let Some((_, group)) = file_groups_indices
901                .iter_mut()
902                .enumerate()
903                .filter(|(_, group)| {
904                    group.is_empty()
905                        || min
906                            > statistics
907                                .max(*group.last().expect("groups should not be empty"))
908                })
909                .min_by_key(|(_, group)| group.len())
910            {
911                group.push(idx);
912            } else {
913                // Create a new group if no existing group fits
914                file_groups_indices.push(vec![idx]);
915            }
916        }
917
918        // Remove any empty groups
919        file_groups_indices.retain(|group| !group.is_empty());
920
921        // Assemble indices back into groups of PartitionedFiles
922        Ok(file_groups_indices
923            .into_iter()
924            .map(|file_group_indices| {
925                FileGroup::new(
926                    file_group_indices
927                        .into_iter()
928                        .map(|idx| flattened_files[idx].clone())
929                        .collect(),
930                )
931            })
932            .collect())
933    }
934
935    /// Attempts to do a bin-packing on files into file groups, such that any two files
936    /// in a file group are ordered and non-overlapping with respect to their statistics.
937    /// It will produce the smallest number of file groups possible.
938    pub fn split_groups_by_statistics(
939        table_schema: &SchemaRef,
940        file_groups: &[FileGroup],
941        sort_order: &LexOrdering,
942    ) -> Result<Vec<FileGroup>> {
943        let flattened_files = file_groups
944            .iter()
945            .flat_map(FileGroup::iter)
946            .collect::<Vec<_>>();
947        // First Fit:
948        // * Choose the first file group that a file can be placed into.
949        // * If it fits into no existing file groups, create a new one.
950        //
951        // By sorting files by min values and then applying first-fit bin packing,
952        // we can produce the smallest number of file groups such that
953        // files within a group are in order and non-overlapping.
954        //
955        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
956        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
957
958        if flattened_files.is_empty() {
959            return Ok(vec![]);
960        }
961
962        let statistics = MinMaxStatistics::new_from_files(
963            sort_order,
964            table_schema,
965            None,
966            flattened_files.iter().copied(),
967        )
968        .map_err(|e| {
969            e.context("construct min/max statistics for split_groups_by_statistics")
970        })?;
971
972        let indices_sorted_by_min = statistics.min_values_sorted();
973        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
974
975        for (idx, min) in indices_sorted_by_min {
976            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
977                // If our file is non-overlapping and comes _after_ the last file,
978                // it fits in this file group.
979                min > statistics.max(
980                    *group
981                        .last()
982                        .expect("groups should be nonempty at construction"),
983                )
984            });
985            match file_group_to_insert {
986                Some(group) => group.push(idx),
987                None => file_groups_indices.push(vec![idx]),
988            }
989        }
990
991        // Assemble indices back into groups of PartitionedFiles
992        Ok(file_groups_indices
993            .into_iter()
994            .map(|file_group_indices| {
995                file_group_indices
996                    .into_iter()
997                    .map(|idx| flattened_files[idx].clone())
998                    .collect()
999            })
1000            .collect())
1001    }
1002
1003    /// Write the data_type based on file_source
1004    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1005        write!(f, ", file_type={}", self.file_source.file_type())?;
1006        self.file_source.fmt_extra(t, f)
1007    }
1008
1009    /// Returns the file_source
1010    pub fn file_source(&self) -> &Arc<dyn FileSource> {
1011        &self.file_source
1012    }
1013}
1014
1015impl Debug for FileScanConfig {
1016    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1017        write!(f, "FileScanConfig {{")?;
1018        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1019
1020        write!(
1021            f,
1022            "statistics={:?}, ",
1023            self.file_source.statistics().unwrap()
1024        )?;
1025
1026        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1027        write!(f, "}}")
1028    }
1029}
1030
1031impl DisplayAs for FileScanConfig {
1032    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1033        let schema = self.projected_schema();
1034        let orderings = get_projected_output_ordering(self, &schema);
1035
1036        write!(f, "file_groups=")?;
1037        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1038
1039        if !schema.fields().is_empty() {
1040            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1041        }
1042
1043        if let Some(limit) = self.limit {
1044            write!(f, ", limit={limit}")?;
1045        }
1046
1047        display_orderings(f, &orderings)?;
1048
1049        if !self.constraints.is_empty() {
1050            write!(f, ", {}", self.constraints)?;
1051        }
1052
1053        Ok(())
1054    }
1055}
1056
1057/// A helper that projects partition columns into the file record batches.
1058///
1059/// One interesting trick is the usage of a cache for the key buffers of the partition column
1060/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
1061/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
1062/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
1063pub struct PartitionColumnProjector {
1064    /// An Arrow buffer initialized to zeros that represents the key array of all partition
1065    /// columns (partition columns are materialized by dictionary arrays with only one
1066    /// value in the dictionary, thus all the keys are equal to zero).
1067    key_buffer_cache: ZeroBufferGenerators,
1068    /// Mapping between the indexes in the list of partition columns and the target
1069    /// schema. Sorted by index in the target schema so that we can iterate on it to
1070    /// insert the partition columns in the target record batch.
1071    projected_partition_indexes: Vec<(usize, usize)>,
1072    /// The schema of the table once the projection was applied.
1073    projected_schema: SchemaRef,
1074}
1075
1076impl PartitionColumnProjector {
1077    // Create a projector to insert the partitioning columns into batches read from files
1078    // - `projected_schema`: the target schema with both file and partitioning columns
1079    // - `table_partition_cols`: all the partitioning column names
1080    pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1081        let mut idx_map = HashMap::new();
1082        for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1083            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1084                idx_map.insert(partition_idx, schema_idx);
1085            }
1086        }
1087
1088        let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1089        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1090
1091        Self {
1092            projected_partition_indexes,
1093            key_buffer_cache: Default::default(),
1094            projected_schema,
1095        }
1096    }
1097
1098    // Transform the batch read from the file by inserting the partitioning columns
1099    // to the right positions as deduced from `projected_schema`
1100    // - `file_batch`: batch read from the file, with internal projection applied
1101    // - `partition_values`: the list of partition values, one for each partition column
1102    pub fn project(
1103        &mut self,
1104        file_batch: RecordBatch,
1105        partition_values: &[ScalarValue],
1106    ) -> Result<RecordBatch> {
1107        let expected_cols =
1108            self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1109
1110        if file_batch.columns().len() != expected_cols {
1111            return exec_err!(
1112                "Unexpected batch schema from file, expected {} cols but got {}",
1113                expected_cols,
1114                file_batch.columns().len()
1115            );
1116        }
1117
1118        let mut cols = file_batch.columns().to_vec();
1119        for &(pidx, sidx) in &self.projected_partition_indexes {
1120            let p_value =
1121                partition_values
1122                    .get(pidx)
1123                    .ok_or(DataFusionError::Execution(
1124                        "Invalid partitioning found on disk".to_string(),
1125                    ))?;
1126
1127            let mut partition_value = Cow::Borrowed(p_value);
1128
1129            // check if user forgot to dict-encode the partition value
1130            let field = self.projected_schema.field(sidx);
1131            let expected_data_type = field.data_type();
1132            let actual_data_type = partition_value.data_type();
1133            if let DataType::Dictionary(key_type, _) = expected_data_type {
1134                if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1135                    warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1136                    partition_value = Cow::Owned(ScalarValue::Dictionary(
1137                        key_type.clone(),
1138                        Box::new(partition_value.as_ref().clone()),
1139                    ));
1140                }
1141            }
1142
1143            cols.insert(
1144                sidx,
1145                create_output_array(
1146                    &mut self.key_buffer_cache,
1147                    partition_value.as_ref(),
1148                    file_batch.num_rows(),
1149                )?,
1150            )
1151        }
1152
1153        RecordBatch::try_new_with_options(
1154            Arc::clone(&self.projected_schema),
1155            cols,
1156            &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1157        )
1158        .map_err(Into::into)
1159    }
1160}
1161
1162#[derive(Debug, Default)]
1163struct ZeroBufferGenerators {
1164    gen_i8: ZeroBufferGenerator<i8>,
1165    gen_i16: ZeroBufferGenerator<i16>,
1166    gen_i32: ZeroBufferGenerator<i32>,
1167    gen_i64: ZeroBufferGenerator<i64>,
1168    gen_u8: ZeroBufferGenerator<u8>,
1169    gen_u16: ZeroBufferGenerator<u16>,
1170    gen_u32: ZeroBufferGenerator<u32>,
1171    gen_u64: ZeroBufferGenerator<u64>,
1172}
1173
1174/// Generate a arrow [`Buffer`] that contains zero values.
1175#[derive(Debug, Default)]
1176struct ZeroBufferGenerator<T>
1177where
1178    T: ArrowNativeType,
1179{
1180    cache: Option<Buffer>,
1181    _t: PhantomData<T>,
1182}
1183
1184impl<T> ZeroBufferGenerator<T>
1185where
1186    T: ArrowNativeType,
1187{
1188    const SIZE: usize = size_of::<T>();
1189
1190    fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1191        match &mut self.cache {
1192            Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1193                buf.slice_with_length(0, n_vals * Self::SIZE)
1194            }
1195            _ => {
1196                let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1197                key_buffer_builder.advance(n_vals); // keys are all 0
1198                self.cache.insert(key_buffer_builder.finish()).clone()
1199            }
1200        }
1201    }
1202}
1203
1204fn create_dict_array<T>(
1205    buffer_gen: &mut ZeroBufferGenerator<T>,
1206    dict_val: &ScalarValue,
1207    len: usize,
1208    data_type: DataType,
1209) -> Result<ArrayRef>
1210where
1211    T: ArrowNativeType,
1212{
1213    let dict_vals = dict_val.to_array()?;
1214
1215    let sliced_key_buffer = buffer_gen.get_buffer(len);
1216
1217    // assemble pieces together
1218    let mut builder = ArrayData::builder(data_type)
1219        .len(len)
1220        .add_buffer(sliced_key_buffer);
1221    builder = builder.add_child_data(dict_vals.to_data());
1222    Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1223        builder.build().unwrap(),
1224    )))
1225}
1226
1227fn create_output_array(
1228    key_buffer_cache: &mut ZeroBufferGenerators,
1229    val: &ScalarValue,
1230    len: usize,
1231) -> Result<ArrayRef> {
1232    if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1233        match key_type.as_ref() {
1234            DataType::Int8 => {
1235                return create_dict_array(
1236                    &mut key_buffer_cache.gen_i8,
1237                    dict_val,
1238                    len,
1239                    val.data_type(),
1240                );
1241            }
1242            DataType::Int16 => {
1243                return create_dict_array(
1244                    &mut key_buffer_cache.gen_i16,
1245                    dict_val,
1246                    len,
1247                    val.data_type(),
1248                );
1249            }
1250            DataType::Int32 => {
1251                return create_dict_array(
1252                    &mut key_buffer_cache.gen_i32,
1253                    dict_val,
1254                    len,
1255                    val.data_type(),
1256                );
1257            }
1258            DataType::Int64 => {
1259                return create_dict_array(
1260                    &mut key_buffer_cache.gen_i64,
1261                    dict_val,
1262                    len,
1263                    val.data_type(),
1264                );
1265            }
1266            DataType::UInt8 => {
1267                return create_dict_array(
1268                    &mut key_buffer_cache.gen_u8,
1269                    dict_val,
1270                    len,
1271                    val.data_type(),
1272                );
1273            }
1274            DataType::UInt16 => {
1275                return create_dict_array(
1276                    &mut key_buffer_cache.gen_u16,
1277                    dict_val,
1278                    len,
1279                    val.data_type(),
1280                );
1281            }
1282            DataType::UInt32 => {
1283                return create_dict_array(
1284                    &mut key_buffer_cache.gen_u32,
1285                    dict_val,
1286                    len,
1287                    val.data_type(),
1288                );
1289            }
1290            DataType::UInt64 => {
1291                return create_dict_array(
1292                    &mut key_buffer_cache.gen_u64,
1293                    dict_val,
1294                    len,
1295                    val.data_type(),
1296                );
1297            }
1298            _ => {}
1299        }
1300    }
1301
1302    val.to_array_of_size(len)
1303}
1304
1305/// The various listing tables does not attempt to read all files
1306/// concurrently, instead they will read files in sequence within a
1307/// partition.  This is an important property as it allows plans to
1308/// run against 1000s of files and not try to open them all
1309/// concurrently.
1310///
1311/// However, it means if we assign more than one file to a partition
1312/// the output sort order will not be preserved as illustrated in the
1313/// following diagrams:
1314///
1315/// When only 1 file is assigned to each partition, each partition is
1316/// correctly sorted on `(A, B, C)`
1317///
1318/// ```text
1319///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
1320///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
1321///┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
1322///  │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
1323///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
1324///  │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
1325///┃                                          │                    │                     ┃
1326///  │                   │ │                    │                    │                 │
1327///┃                                          │                    │                     ┃
1328///  │                   │ │                    │                    │                 │
1329///┃                                          │                    │                     ┃
1330///  │                   │ │                    │                    │                 │
1331///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1332///     DataFusion           DataFusion           DataFusion           DataFusion
1333///┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
1334/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1335///
1336///                                      DataSourceExec
1337///```
1338///
1339/// However, when more than 1 file is assigned to each partition, each
1340/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
1341/// file is scanned, the same values for A, B and C can be repeated in
1342/// the same sorted stream
1343///
1344///```text
1345///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1346///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1347///┃   ┌───────────────┐     ┌──────────────┐ │
1348///  │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
1349///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1350///  │ └───────────────┘ │ │ └──────────────┘   ┃
1351///┃   ┌───────────────┐     ┌──────────────┐ │
1352///  │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
1353///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1354///  │ └───────────────┘ │ │ └──────────────┘   ┃
1355///┃                                          │
1356///  │                   │ │                    ┃
1357///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
1358///     DataFusion           DataFusion         ┃
1359///┃    Partition 1          Partition 2
1360/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
1361///
1362///              DataSourceExec
1363///```
1364fn get_projected_output_ordering(
1365    base_config: &FileScanConfig,
1366    projected_schema: &SchemaRef,
1367) -> Vec<LexOrdering> {
1368    let mut all_orderings = vec![];
1369    for output_ordering in &base_config.output_ordering {
1370        let mut new_ordering = vec![];
1371        for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1372            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1373                let name = col.name();
1374                if let Some((idx, _)) = projected_schema.column_with_name(name) {
1375                    // Compute the new sort expression (with correct index) after projection:
1376                    new_ordering.push(PhysicalSortExpr::new(
1377                        Arc::new(Column::new(name, idx)),
1378                        *options,
1379                    ));
1380                    continue;
1381                }
1382            }
1383            // Cannot find expression in the projected_schema, stop iterating
1384            // since rest of the orderings are violated
1385            break;
1386        }
1387
1388        let Some(new_ordering) = LexOrdering::new(new_ordering) else {
1389            continue;
1390        };
1391
1392        // Check if any file groups are not sorted
1393        if base_config.file_groups.iter().any(|group| {
1394            if group.len() <= 1 {
1395                // File groups with <= 1 files are always sorted
1396                return false;
1397            }
1398
1399            let statistics = match MinMaxStatistics::new_from_files(
1400                &new_ordering,
1401                projected_schema,
1402                base_config.projection.as_deref(),
1403                group.iter(),
1404            ) {
1405                Ok(statistics) => statistics,
1406                Err(e) => {
1407                    log::trace!("Error fetching statistics for file group: {e}");
1408                    // we can't prove that it's ordered, so we have to reject it
1409                    return true;
1410                }
1411            };
1412
1413            !statistics.is_sorted()
1414        }) {
1415            debug!(
1416                "Skipping specified output ordering {:?}. \
1417                Some file groups couldn't be determined to be sorted: {:?}",
1418                base_config.output_ordering[0], base_config.file_groups
1419            );
1420            continue;
1421        }
1422
1423        all_orderings.push(new_ordering);
1424    }
1425    all_orderings
1426}
1427
1428/// Convert type to a type suitable for use as a `ListingTable`
1429/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1430/// a reasonable trade off between a reasonable number of partition
1431/// values and space efficiency.
1432///
1433/// This use this to specify types for partition columns. However
1434/// you MAY also choose not to dictionary-encode the data or to use a
1435/// different dictionary type.
1436///
1437/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1438pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1439    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1440}
1441
1442/// Convert a [`ScalarValue`] of partition columns to a type, as
1443/// described in the documentation of [`wrap_partition_type_in_dict`],
1444/// which can wrap the types.
1445pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1446    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1447}
1448
1449#[cfg(test)]
1450mod tests {
1451    use super::*;
1452    use crate::{
1453        generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1454        verify_sort_integrity,
1455    };
1456
1457    use arrow::array::{Int32Array, RecordBatch};
1458    use datafusion_common::stats::Precision;
1459    use datafusion_common::{assert_batches_eq, internal_err};
1460    use datafusion_expr::SortExpr;
1461    use datafusion_physical_expr::create_physical_sort_expr;
1462
1463    /// Returns the column names on the schema
1464    pub fn columns(schema: &Schema) -> Vec<String> {
1465        schema.fields().iter().map(|f| f.name().clone()).collect()
1466    }
1467
1468    #[test]
1469    fn physical_plan_config_no_projection() {
1470        let file_schema = aggr_test_schema();
1471        let conf = config_for_projection(
1472            Arc::clone(&file_schema),
1473            None,
1474            Statistics::new_unknown(&file_schema),
1475            to_partition_cols(vec![(
1476                "date".to_owned(),
1477                wrap_partition_type_in_dict(DataType::Utf8),
1478            )]),
1479        );
1480
1481        let (proj_schema, _, proj_statistics, _) = conf.project();
1482        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1483        assert_eq!(
1484            proj_schema.field(file_schema.fields().len()).name(),
1485            "date",
1486            "partition columns are the last columns"
1487        );
1488        assert_eq!(
1489            proj_statistics.column_statistics.len(),
1490            file_schema.fields().len() + 1
1491        );
1492        // TODO implement tests for partition column statistics once implemented
1493
1494        let col_names = conf.projected_file_column_names();
1495        assert_eq!(col_names, None);
1496
1497        let col_indices = conf.file_column_projection_indices();
1498        assert_eq!(col_indices, None);
1499    }
1500
1501    #[test]
1502    fn physical_plan_config_no_projection_tab_cols_as_field() {
1503        let file_schema = aggr_test_schema();
1504
1505        // make a table_partition_col as a field
1506        let table_partition_col =
1507            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1508                .with_metadata(HashMap::from_iter(vec![(
1509                    "key_whatever".to_owned(),
1510                    "value_whatever".to_owned(),
1511                )]));
1512
1513        let conf = config_for_projection(
1514            Arc::clone(&file_schema),
1515            None,
1516            Statistics::new_unknown(&file_schema),
1517            vec![table_partition_col.clone()],
1518        );
1519
1520        // verify the proj_schema includes the last column and exactly the same the field it is defined
1521        let proj_schema = conf.projected_schema();
1522        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1523        assert_eq!(
1524            *proj_schema.field(file_schema.fields().len()),
1525            table_partition_col,
1526            "partition columns are the last columns and ust have all values defined in created field"
1527        );
1528    }
1529
1530    #[test]
1531    fn physical_plan_config_with_projection() {
1532        let file_schema = aggr_test_schema();
1533        let conf = config_for_projection(
1534            Arc::clone(&file_schema),
1535            Some(vec![file_schema.fields().len(), 0]),
1536            Statistics {
1537                num_rows: Precision::Inexact(10),
1538                // assign the column index to distinct_count to help assert
1539                // the source statistic after the projection
1540                column_statistics: (0..file_schema.fields().len())
1541                    .map(|i| ColumnStatistics {
1542                        distinct_count: Precision::Inexact(i),
1543                        ..Default::default()
1544                    })
1545                    .collect(),
1546                total_byte_size: Precision::Absent,
1547            },
1548            to_partition_cols(vec![(
1549                "date".to_owned(),
1550                wrap_partition_type_in_dict(DataType::Utf8),
1551            )]),
1552        );
1553
1554        let (proj_schema, _, proj_statistics, _) = conf.project();
1555        assert_eq!(
1556            columns(&proj_schema),
1557            vec!["date".to_owned(), "c1".to_owned()]
1558        );
1559        let proj_stat_cols = proj_statistics.column_statistics;
1560        assert_eq!(proj_stat_cols.len(), 2);
1561        // TODO implement tests for proj_stat_cols[0] once partition column
1562        // statistics are implemented
1563        assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1564
1565        let col_names = conf.projected_file_column_names();
1566        assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1567
1568        let col_indices = conf.file_column_projection_indices();
1569        assert_eq!(col_indices, Some(vec![0]));
1570    }
1571
1572    #[test]
1573    fn partition_column_projector() {
1574        let file_batch = build_table_i32(
1575            ("a", &vec![0, 1, 2]),
1576            ("b", &vec![-2, -1, 0]),
1577            ("c", &vec![10, 11, 12]),
1578        );
1579        let partition_cols = vec![
1580            (
1581                "year".to_owned(),
1582                wrap_partition_type_in_dict(DataType::Utf8),
1583            ),
1584            (
1585                "month".to_owned(),
1586                wrap_partition_type_in_dict(DataType::Utf8),
1587            ),
1588            (
1589                "day".to_owned(),
1590                wrap_partition_type_in_dict(DataType::Utf8),
1591            ),
1592        ];
1593        // create a projected schema
1594        let statistics = Statistics {
1595            num_rows: Precision::Inexact(3),
1596            total_byte_size: Precision::Absent,
1597            column_statistics: Statistics::unknown_column(&file_batch.schema()),
1598        };
1599
1600        let conf = config_for_projection(
1601            file_batch.schema(),
1602            // keep all cols from file and 2 from partitioning
1603            Some(vec![
1604                0,
1605                1,
1606                2,
1607                file_batch.schema().fields().len(),
1608                file_batch.schema().fields().len() + 2,
1609            ]),
1610            statistics.clone(),
1611            to_partition_cols(partition_cols.clone()),
1612        );
1613
1614        let source_statistics = conf.file_source.statistics().unwrap();
1615        let conf_stats = conf.statistics().unwrap();
1616
1617        // projection should be reflected in the file source statistics
1618        assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1619
1620        // 3 original statistics + 2 partition statistics
1621        assert_eq!(conf_stats.column_statistics.len(), 5);
1622
1623        // file statics should not be modified
1624        assert_eq!(source_statistics, statistics);
1625        assert_eq!(source_statistics.column_statistics.len(), 3);
1626
1627        let proj_schema = conf.projected_schema();
1628        // created a projector for that projected schema
1629        let mut proj = PartitionColumnProjector::new(
1630            proj_schema,
1631            &partition_cols
1632                .iter()
1633                .map(|x| x.0.clone())
1634                .collect::<Vec<_>>(),
1635        );
1636
1637        // project first batch
1638        let projected_batch = proj
1639            .project(
1640                // file_batch is ok here because we kept all the file cols in the projection
1641                file_batch,
1642                &[
1643                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1644                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1645                    wrap_partition_value_in_dict(ScalarValue::from("26")),
1646                ],
1647            )
1648            .expect("Projection of partition columns into record batch failed");
1649        let expected = [
1650            "+---+----+----+------+-----+",
1651            "| a | b  | c  | year | day |",
1652            "+---+----+----+------+-----+",
1653            "| 0 | -2 | 10 | 2021 | 26  |",
1654            "| 1 | -1 | 11 | 2021 | 26  |",
1655            "| 2 | 0  | 12 | 2021 | 26  |",
1656            "+---+----+----+------+-----+",
1657        ];
1658        assert_batches_eq!(expected, &[projected_batch]);
1659
1660        // project another batch that is larger than the previous one
1661        let file_batch = build_table_i32(
1662            ("a", &vec![5, 6, 7, 8, 9]),
1663            ("b", &vec![-10, -9, -8, -7, -6]),
1664            ("c", &vec![12, 13, 14, 15, 16]),
1665        );
1666        let projected_batch = proj
1667            .project(
1668                // file_batch is ok here because we kept all the file cols in the projection
1669                file_batch,
1670                &[
1671                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1672                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1673                    wrap_partition_value_in_dict(ScalarValue::from("27")),
1674                ],
1675            )
1676            .expect("Projection of partition columns into record batch failed");
1677        let expected = [
1678            "+---+-----+----+------+-----+",
1679            "| a | b   | c  | year | day |",
1680            "+---+-----+----+------+-----+",
1681            "| 5 | -10 | 12 | 2021 | 27  |",
1682            "| 6 | -9  | 13 | 2021 | 27  |",
1683            "| 7 | -8  | 14 | 2021 | 27  |",
1684            "| 8 | -7  | 15 | 2021 | 27  |",
1685            "| 9 | -6  | 16 | 2021 | 27  |",
1686            "+---+-----+----+------+-----+",
1687        ];
1688        assert_batches_eq!(expected, &[projected_batch]);
1689
1690        // project another batch that is smaller than the previous one
1691        let file_batch = build_table_i32(
1692            ("a", &vec![0, 1, 3]),
1693            ("b", &vec![2, 3, 4]),
1694            ("c", &vec![4, 5, 6]),
1695        );
1696        let projected_batch = proj
1697            .project(
1698                // file_batch is ok here because we kept all the file cols in the projection
1699                file_batch,
1700                &[
1701                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1702                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1703                    wrap_partition_value_in_dict(ScalarValue::from("28")),
1704                ],
1705            )
1706            .expect("Projection of partition columns into record batch failed");
1707        let expected = [
1708            "+---+---+---+------+-----+",
1709            "| a | b | c | year | day |",
1710            "+---+---+---+------+-----+",
1711            "| 0 | 2 | 4 | 2021 | 28  |",
1712            "| 1 | 3 | 5 | 2021 | 28  |",
1713            "| 3 | 4 | 6 | 2021 | 28  |",
1714            "+---+---+---+------+-----+",
1715        ];
1716        assert_batches_eq!(expected, &[projected_batch]);
1717
1718        // forgot to dictionary-wrap the scalar value
1719        let file_batch = build_table_i32(
1720            ("a", &vec![0, 1, 2]),
1721            ("b", &vec![-2, -1, 0]),
1722            ("c", &vec![10, 11, 12]),
1723        );
1724        let projected_batch = proj
1725            .project(
1726                // file_batch is ok here because we kept all the file cols in the projection
1727                file_batch,
1728                &[
1729                    ScalarValue::from("2021"),
1730                    ScalarValue::from("10"),
1731                    ScalarValue::from("26"),
1732                ],
1733            )
1734            .expect("Projection of partition columns into record batch failed");
1735        let expected = [
1736            "+---+----+----+------+-----+",
1737            "| a | b  | c  | year | day |",
1738            "+---+----+----+------+-----+",
1739            "| 0 | -2 | 10 | 2021 | 26  |",
1740            "| 1 | -1 | 11 | 2021 | 26  |",
1741            "| 2 | 0  | 12 | 2021 | 26  |",
1742            "+---+----+----+------+-----+",
1743        ];
1744        assert_batches_eq!(expected, &[projected_batch]);
1745    }
1746
1747    #[test]
1748    fn test_projected_file_schema_with_partition_col() {
1749        let schema = aggr_test_schema();
1750        let partition_cols = vec![
1751            (
1752                "part1".to_owned(),
1753                wrap_partition_type_in_dict(DataType::Utf8),
1754            ),
1755            (
1756                "part2".to_owned(),
1757                wrap_partition_type_in_dict(DataType::Utf8),
1758            ),
1759        ];
1760
1761        // Projected file schema for config with projection including partition column
1762        let projection = config_for_projection(
1763            schema.clone(),
1764            Some(vec![0, 3, 5, schema.fields().len()]),
1765            Statistics::new_unknown(&schema),
1766            to_partition_cols(partition_cols),
1767        )
1768        .projected_file_schema();
1769
1770        // Assert partition column filtered out in projected file schema
1771        let expected_columns = vec!["c1", "c4", "c6"];
1772        let actual_columns = projection
1773            .fields()
1774            .iter()
1775            .map(|f| f.name().clone())
1776            .collect::<Vec<_>>();
1777        assert_eq!(expected_columns, actual_columns);
1778    }
1779
1780    #[test]
1781    fn test_projected_file_schema_without_projection() {
1782        let schema = aggr_test_schema();
1783        let partition_cols = vec![
1784            (
1785                "part1".to_owned(),
1786                wrap_partition_type_in_dict(DataType::Utf8),
1787            ),
1788            (
1789                "part2".to_owned(),
1790                wrap_partition_type_in_dict(DataType::Utf8),
1791            ),
1792        ];
1793
1794        // Projected file schema for config without projection
1795        let projection = config_for_projection(
1796            schema.clone(),
1797            None,
1798            Statistics::new_unknown(&schema),
1799            to_partition_cols(partition_cols),
1800        )
1801        .projected_file_schema();
1802
1803        // Assert projected file schema is equal to file schema
1804        assert_eq!(projection.fields(), schema.fields());
1805    }
1806
1807    #[test]
1808    fn test_split_groups_by_statistics() -> Result<()> {
1809        use chrono::TimeZone;
1810        use datafusion_common::DFSchema;
1811        use datafusion_expr::execution_props::ExecutionProps;
1812        use object_store::{path::Path, ObjectMeta};
1813
1814        struct File {
1815            name: &'static str,
1816            date: &'static str,
1817            statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1818        }
1819        impl File {
1820            fn new(
1821                name: &'static str,
1822                date: &'static str,
1823                statistics: Vec<Option<(f64, f64)>>,
1824            ) -> Self {
1825                Self::new_nullable(
1826                    name,
1827                    date,
1828                    statistics
1829                        .into_iter()
1830                        .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1831                        .collect(),
1832                )
1833            }
1834
1835            fn new_nullable(
1836                name: &'static str,
1837                date: &'static str,
1838                statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1839            ) -> Self {
1840                Self {
1841                    name,
1842                    date,
1843                    statistics,
1844                }
1845            }
1846        }
1847
1848        struct TestCase {
1849            name: &'static str,
1850            file_schema: Schema,
1851            files: Vec<File>,
1852            sort: Vec<SortExpr>,
1853            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1854        }
1855
1856        use datafusion_expr::col;
1857        let cases = vec![
1858            TestCase {
1859                name: "test sort",
1860                file_schema: Schema::new(vec![Field::new(
1861                    "value".to_string(),
1862                    DataType::Float64,
1863                    false,
1864                )]),
1865                files: vec![
1866                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1867                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1868                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1869                ],
1870                sort: vec![col("value").sort(true, false)],
1871                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1872            },
1873            // same input but file '2' is in the middle
1874            // test that we still order correctly
1875            TestCase {
1876                name: "test sort with files ordered differently",
1877                file_schema: Schema::new(vec![Field::new(
1878                    "value".to_string(),
1879                    DataType::Float64,
1880                    false,
1881                )]),
1882                files: vec![
1883                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1884                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1885                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1886                ],
1887                sort: vec![col("value").sort(true, false)],
1888                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1889            },
1890            TestCase {
1891                name: "reverse sort",
1892                file_schema: Schema::new(vec![Field::new(
1893                    "value".to_string(),
1894                    DataType::Float64,
1895                    false,
1896                )]),
1897                files: vec![
1898                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1899                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1900                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1901                ],
1902                sort: vec![col("value").sort(false, true)],
1903                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1904            },
1905            TestCase {
1906                name: "nullable sort columns, nulls last",
1907                file_schema: Schema::new(vec![Field::new(
1908                    "value".to_string(),
1909                    DataType::Float64,
1910                    true,
1911                )]),
1912                files: vec![
1913                    File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
1914                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1915                    File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1916                ],
1917                sort: vec![col("value").sort(true, false)],
1918                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1919            },
1920            TestCase {
1921                name: "nullable sort columns, nulls first",
1922                file_schema: Schema::new(vec![Field::new(
1923                    "value".to_string(),
1924                    DataType::Float64,
1925                    true,
1926                )]),
1927                files: vec![
1928                    File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1929                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
1930                    File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1931                ],
1932                sort: vec![col("value").sort(true, true)],
1933                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1934            },
1935            TestCase {
1936                name: "all three non-overlapping",
1937                file_schema: Schema::new(vec![Field::new(
1938                    "value".to_string(),
1939                    DataType::Float64,
1940                    false,
1941                )]),
1942                files: vec![
1943                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1944                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1945                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1946                ],
1947                sort: vec![col("value").sort(true, false)],
1948                expected_result: Ok(vec![vec!["0", "1", "2"]]),
1949            },
1950            TestCase {
1951                name: "all three overlapping",
1952                file_schema: Schema::new(vec![Field::new(
1953                    "value".to_string(),
1954                    DataType::Float64,
1955                    false,
1956                )]),
1957                files: vec![
1958                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1959                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1960                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1961                ],
1962                sort: vec![col("value").sort(true, false)],
1963                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1964            },
1965            TestCase {
1966                name: "empty input",
1967                file_schema: Schema::new(vec![Field::new(
1968                    "value".to_string(),
1969                    DataType::Float64,
1970                    false,
1971                )]),
1972                files: vec![],
1973                sort: vec![col("value").sort(true, false)],
1974                expected_result: Ok(vec![]),
1975            },
1976            TestCase {
1977                name: "one file missing statistics",
1978                file_schema: Schema::new(vec![Field::new(
1979                    "value".to_string(),
1980                    DataType::Float64,
1981                    false,
1982                )]),
1983                files: vec![
1984                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1985                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1986                    File::new("2", "2023-01-02", vec![None]),
1987                ],
1988                sort: vec![col("value").sort(true, false)],
1989                expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
1990            },
1991        ];
1992
1993        for case in cases {
1994            let table_schema = Arc::new(Schema::new(
1995                case.file_schema
1996                    .fields()
1997                    .clone()
1998                    .into_iter()
1999                    .cloned()
2000                    .chain(Some(Arc::new(Field::new(
2001                        "date".to_string(),
2002                        DataType::Utf8,
2003                        false,
2004                    ))))
2005                    .collect::<Vec<_>>(),
2006            ));
2007            let Some(sort_order) = LexOrdering::new(
2008                case.sort
2009                    .into_iter()
2010                    .map(|expr| {
2011                        create_physical_sort_expr(
2012                            &expr,
2013                            &DFSchema::try_from(Arc::clone(&table_schema))?,
2014                            &ExecutionProps::default(),
2015                        )
2016                    })
2017                    .collect::<Result<Vec<_>>>()?,
2018            ) else {
2019                return internal_err!("This test should always use an ordering");
2020            };
2021
2022            let partitioned_files = FileGroup::new(
2023                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2024            );
2025            let result = FileScanConfig::split_groups_by_statistics(
2026                &table_schema,
2027                std::slice::from_ref(&partitioned_files),
2028                &sort_order,
2029            );
2030            let results_by_name = result
2031                .as_ref()
2032                .map(|file_groups| {
2033                    file_groups
2034                        .iter()
2035                        .map(|file_group| {
2036                            file_group
2037                                .iter()
2038                                .map(|file| {
2039                                    partitioned_files
2040                                        .iter()
2041                                        .find_map(|f| {
2042                                            if f.object_meta == file.object_meta {
2043                                                Some(
2044                                                    f.object_meta
2045                                                        .location
2046                                                        .as_ref()
2047                                                        .rsplit('/')
2048                                                        .next()
2049                                                        .unwrap()
2050                                                        .trim_end_matches(".parquet"),
2051                                                )
2052                                            } else {
2053                                                None
2054                                            }
2055                                        })
2056                                        .unwrap()
2057                                })
2058                                .collect::<Vec<_>>()
2059                        })
2060                        .collect::<Vec<_>>()
2061                })
2062                .map_err(|e| e.strip_backtrace().leak() as &'static str);
2063
2064            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2065        }
2066
2067        return Ok(());
2068
2069        impl From<File> for PartitionedFile {
2070            fn from(file: File) -> Self {
2071                PartitionedFile {
2072                    object_meta: ObjectMeta {
2073                        location: Path::from(format!(
2074                            "data/date={}/{}.parquet",
2075                            file.date, file.name
2076                        )),
2077                        last_modified: chrono::Utc.timestamp_nanos(0),
2078                        size: 0,
2079                        e_tag: None,
2080                        version: None,
2081                    },
2082                    partition_values: vec![ScalarValue::from(file.date)],
2083                    range: None,
2084                    statistics: Some(Arc::new(Statistics {
2085                        num_rows: Precision::Absent,
2086                        total_byte_size: Precision::Absent,
2087                        column_statistics: file
2088                            .statistics
2089                            .into_iter()
2090                            .map(|stats| {
2091                                stats
2092                                    .map(|(min, max)| ColumnStatistics {
2093                                        min_value: Precision::Exact(
2094                                            ScalarValue::Float64(min),
2095                                        ),
2096                                        max_value: Precision::Exact(
2097                                            ScalarValue::Float64(max),
2098                                        ),
2099                                        ..Default::default()
2100                                    })
2101                                    .unwrap_or_default()
2102                            })
2103                            .collect::<Vec<_>>(),
2104                    })),
2105                    extensions: None,
2106                    metadata_size_hint: None,
2107                }
2108            }
2109        }
2110    }
2111
2112    // sets default for configs that play no role in projections
2113    fn config_for_projection(
2114        file_schema: SchemaRef,
2115        projection: Option<Vec<usize>>,
2116        statistics: Statistics,
2117        table_partition_cols: Vec<Field>,
2118    ) -> FileScanConfig {
2119        FileScanConfigBuilder::new(
2120            ObjectStoreUrl::parse("test:///").unwrap(),
2121            file_schema,
2122            Arc::new(MockSource::default()),
2123        )
2124        .with_projection(projection)
2125        .with_statistics(statistics)
2126        .with_table_partition_cols(table_partition_cols)
2127        .build()
2128    }
2129
2130    /// Convert partition columns from Vec<String DataType> to Vec<Field>
2131    fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2132        table_partition_cols
2133            .iter()
2134            .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2135            .collect::<Vec<_>>()
2136    }
2137
2138    /// returns record batch with 3 columns of i32 in memory
2139    pub fn build_table_i32(
2140        a: (&str, &Vec<i32>),
2141        b: (&str, &Vec<i32>),
2142        c: (&str, &Vec<i32>),
2143    ) -> RecordBatch {
2144        let schema = Schema::new(vec![
2145            Field::new(a.0, DataType::Int32, false),
2146            Field::new(b.0, DataType::Int32, false),
2147            Field::new(c.0, DataType::Int32, false),
2148        ]);
2149
2150        RecordBatch::try_new(
2151            Arc::new(schema),
2152            vec![
2153                Arc::new(Int32Array::from(a.1.clone())),
2154                Arc::new(Int32Array::from(b.1.clone())),
2155                Arc::new(Int32Array::from(c.1.clone())),
2156            ],
2157        )
2158        .unwrap()
2159    }
2160
2161    #[test]
2162    fn test_file_scan_config_builder() {
2163        let file_schema = aggr_test_schema();
2164        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2165        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2166
2167        // Create a builder with required parameters
2168        let builder = FileScanConfigBuilder::new(
2169            object_store_url.clone(),
2170            Arc::clone(&file_schema),
2171            Arc::clone(&file_source),
2172        );
2173
2174        // Build with various configurations
2175        let config = builder
2176            .with_limit(Some(1000))
2177            .with_projection(Some(vec![0, 1]))
2178            .with_table_partition_cols(vec![Field::new(
2179                "date",
2180                wrap_partition_type_in_dict(DataType::Utf8),
2181                false,
2182            )])
2183            .with_statistics(Statistics::new_unknown(&file_schema))
2184            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2185                "test.parquet".to_string(),
2186                1024,
2187            )])])
2188            .with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
2189                Column::new("date", 0),
2190            ))]
2191            .into()])
2192            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2193            .with_newlines_in_values(true)
2194            .build();
2195
2196        // Verify the built config has all the expected values
2197        assert_eq!(config.object_store_url, object_store_url);
2198        assert_eq!(config.file_schema, file_schema);
2199        assert_eq!(config.limit, Some(1000));
2200        assert_eq!(config.projection, Some(vec![0, 1]));
2201        assert_eq!(config.table_partition_cols.len(), 1);
2202        assert_eq!(config.table_partition_cols[0].name(), "date");
2203        assert_eq!(config.file_groups.len(), 1);
2204        assert_eq!(config.file_groups[0].len(), 1);
2205        assert_eq!(
2206            config.file_groups[0][0].object_meta.location.as_ref(),
2207            "test.parquet"
2208        );
2209        assert_eq!(
2210            config.file_compression_type,
2211            FileCompressionType::UNCOMPRESSED
2212        );
2213        assert!(config.new_lines_in_values);
2214        assert_eq!(config.output_ordering.len(), 1);
2215    }
2216
2217    #[test]
2218    fn test_file_scan_config_builder_defaults() {
2219        let file_schema = aggr_test_schema();
2220        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2221        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2222
2223        // Create a builder with only required parameters and build without any additional configurations
2224        let config = FileScanConfigBuilder::new(
2225            object_store_url.clone(),
2226            Arc::clone(&file_schema),
2227            Arc::clone(&file_source),
2228        )
2229        .build();
2230
2231        // Verify default values
2232        assert_eq!(config.object_store_url, object_store_url);
2233        assert_eq!(config.file_schema, file_schema);
2234        assert_eq!(config.limit, None);
2235        assert_eq!(config.projection, None);
2236        assert!(config.table_partition_cols.is_empty());
2237        assert!(config.file_groups.is_empty());
2238        assert_eq!(
2239            config.file_compression_type,
2240            FileCompressionType::UNCOMPRESSED
2241        );
2242        assert!(!config.new_lines_in_values);
2243        assert!(config.output_ordering.is_empty());
2244        assert!(config.constraints.is_empty());
2245
2246        // Verify statistics are set to unknown
2247        assert_eq!(
2248            config.file_source.statistics().unwrap().num_rows,
2249            Precision::Absent
2250        );
2251        assert_eq!(
2252            config.file_source.statistics().unwrap().total_byte_size,
2253            Precision::Absent
2254        );
2255        assert_eq!(
2256            config
2257                .file_source
2258                .statistics()
2259                .unwrap()
2260                .column_statistics
2261                .len(),
2262            file_schema.fields().len()
2263        );
2264        for stat in config.file_source.statistics().unwrap().column_statistics {
2265            assert_eq!(stat.distinct_count, Precision::Absent);
2266            assert_eq!(stat.min_value, Precision::Absent);
2267            assert_eq!(stat.max_value, Precision::Absent);
2268            assert_eq!(stat.null_count, Precision::Absent);
2269        }
2270    }
2271
2272    #[test]
2273    fn test_file_scan_config_builder_new_from() {
2274        let schema = aggr_test_schema();
2275        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2276        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2277        let partition_cols = vec![Field::new(
2278            "date",
2279            wrap_partition_type_in_dict(DataType::Utf8),
2280            false,
2281        )];
2282        let file = PartitionedFile::new("test_file.parquet", 100);
2283
2284        // Create a config with non-default values
2285        let original_config = FileScanConfigBuilder::new(
2286            object_store_url.clone(),
2287            Arc::clone(&schema),
2288            Arc::clone(&file_source),
2289        )
2290        .with_projection(Some(vec![0, 2]))
2291        .with_limit(Some(10))
2292        .with_table_partition_cols(partition_cols.clone())
2293        .with_file(file.clone())
2294        .with_constraints(Constraints::default())
2295        .with_newlines_in_values(true)
2296        .build();
2297
2298        // Create a new builder from the config
2299        let new_builder = FileScanConfigBuilder::from(original_config);
2300
2301        // Build a new config from this builder
2302        let new_config = new_builder.build();
2303
2304        // Verify properties match
2305        let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2306        assert_eq!(new_config.object_store_url, object_store_url);
2307        assert_eq!(new_config.file_schema, schema);
2308        assert_eq!(new_config.projection, Some(vec![0, 2]));
2309        assert_eq!(new_config.limit, Some(10));
2310        assert_eq!(new_config.table_partition_cols, partition_cols);
2311        assert_eq!(new_config.file_groups.len(), 1);
2312        assert_eq!(new_config.file_groups[0].len(), 1);
2313        assert_eq!(
2314            new_config.file_groups[0][0].object_meta.location.as_ref(),
2315            "test_file.parquet"
2316        );
2317        assert_eq!(new_config.constraints, Constraints::default());
2318        assert!(new_config.new_lines_in_values);
2319    }
2320
2321    #[test]
2322    fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2323        use datafusion_common::DFSchema;
2324        use datafusion_expr::{col, execution_props::ExecutionProps};
2325
2326        let schema = Arc::new(Schema::new(vec![Field::new(
2327            "value",
2328            DataType::Float64,
2329            false,
2330        )]));
2331
2332        // Setup sort expression
2333        let exec_props = ExecutionProps::new();
2334        let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2335        let sort_expr = [col("value").sort(true, false)];
2336        let sort_ordering = sort_expr
2337            .map(|expr| {
2338                create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2339            })
2340            .into();
2341
2342        // Test case parameters
2343        struct TestCase {
2344            name: String,
2345            file_count: usize,
2346            overlap_factor: f64,
2347            target_partitions: usize,
2348            expected_partition_count: usize,
2349        }
2350
2351        let test_cases = vec![
2352            // Basic cases
2353            TestCase {
2354                name: "no_overlap_10_files_4_partitions".to_string(),
2355                file_count: 10,
2356                overlap_factor: 0.0,
2357                target_partitions: 4,
2358                expected_partition_count: 4,
2359            },
2360            TestCase {
2361                name: "medium_overlap_20_files_5_partitions".to_string(),
2362                file_count: 20,
2363                overlap_factor: 0.5,
2364                target_partitions: 5,
2365                expected_partition_count: 5,
2366            },
2367            TestCase {
2368                name: "high_overlap_30_files_3_partitions".to_string(),
2369                file_count: 30,
2370                overlap_factor: 0.8,
2371                target_partitions: 3,
2372                expected_partition_count: 7,
2373            },
2374            // Edge cases
2375            TestCase {
2376                name: "fewer_files_than_partitions".to_string(),
2377                file_count: 3,
2378                overlap_factor: 0.0,
2379                target_partitions: 10,
2380                expected_partition_count: 3, // Should only create as many partitions as files
2381            },
2382            TestCase {
2383                name: "single_file".to_string(),
2384                file_count: 1,
2385                overlap_factor: 0.0,
2386                target_partitions: 5,
2387                expected_partition_count: 1, // Should create only one partition
2388            },
2389            TestCase {
2390                name: "empty_files".to_string(),
2391                file_count: 0,
2392                overlap_factor: 0.0,
2393                target_partitions: 3,
2394                expected_partition_count: 0, // Empty result for empty input
2395            },
2396        ];
2397
2398        for case in test_cases {
2399            println!("Running test case: {}", case.name);
2400
2401            // Generate files using bench utility function
2402            let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2403
2404            // Call the function under test
2405            let result =
2406                FileScanConfig::split_groups_by_statistics_with_target_partitions(
2407                    &schema,
2408                    &file_groups,
2409                    &sort_ordering,
2410                    case.target_partitions,
2411                )?;
2412
2413            // Verify results
2414            println!(
2415                "Created {} partitions (target was {})",
2416                result.len(),
2417                case.target_partitions
2418            );
2419
2420            // Check partition count
2421            assert_eq!(
2422                result.len(),
2423                case.expected_partition_count,
2424                "Case '{}': Unexpected partition count",
2425                case.name
2426            );
2427
2428            // Verify sort integrity
2429            assert!(
2430                verify_sort_integrity(&result),
2431                "Case '{}': Files within partitions are not properly ordered",
2432                case.name
2433            );
2434
2435            // Distribution check for partitions
2436            if case.file_count > 1 && case.expected_partition_count > 1 {
2437                let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2438                let max_size = *group_sizes.iter().max().unwrap();
2439                let min_size = *group_sizes.iter().min().unwrap();
2440
2441                // Check partition balancing - difference shouldn't be extreme
2442                let avg_files_per_partition =
2443                    case.file_count as f64 / case.expected_partition_count as f64;
2444                assert!(
2445                    (max_size as f64) < 2.0 * avg_files_per_partition,
2446                    "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2447                    case.name,
2448                    max_size,
2449                    avg_files_per_partition
2450                );
2451
2452                println!("Distribution - min files: {min_size}, max files: {max_size}");
2453            }
2454        }
2455
2456        // Test error case: zero target partitions
2457        let empty_groups: Vec<FileGroup> = vec![];
2458        let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2459            &schema,
2460            &empty_groups,
2461            &sort_ordering,
2462            0,
2463        )
2464        .unwrap_err();
2465
2466        assert!(
2467            err.to_string()
2468                .contains("target_partitions must be greater than 0"),
2469            "Expected error for zero target partitions"
2470        );
2471
2472        Ok(())
2473    }
2474}