datafusion_datasource/
file_scan_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21use crate::file_groups::FileGroup;
22#[allow(unused_imports)]
23use crate::schema_adapter::SchemaAdapterFactory;
24use crate::{
25    display::FileGroupsDisplay, file::FileSource,
26    file_compression_type::FileCompressionType, file_stream::FileStream,
27    source::DataSource, statistics::MinMaxStatistics, PartitionedFile, TableSchema,
28};
29use arrow::datatypes::FieldRef;
30use arrow::{
31    array::{
32        ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
33        RecordBatchOptions,
34    },
35    buffer::Buffer,
36    datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
37};
38use datafusion_common::config::ConfigOptions;
39use datafusion_common::{
40    exec_datafusion_err, exec_err, internal_datafusion_err, ColumnStatistics,
41    Constraints, Result, ScalarValue, Statistics,
42};
43use datafusion_execution::{
44    object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
45};
46use datafusion_expr::Operator;
47use datafusion_physical_expr::expressions::{BinaryExpr, Column};
48use datafusion_physical_expr::projection::ProjectionExprs;
49use datafusion_physical_expr::utils::reassign_expr_columns;
50use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning};
51use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
52use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
53use datafusion_physical_expr_common::sort_expr::LexOrdering;
54use datafusion_physical_plan::projection::{
55    all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
56};
57use datafusion_physical_plan::{
58    display::{display_orderings, ProjectSchemaDisplay},
59    filter_pushdown::FilterPushdownPropagation,
60    metrics::ExecutionPlanMetricsSet,
61    DisplayAs, DisplayFormatType,
62};
63use std::{
64    any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
65    fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
66};
67
68use datafusion_physical_expr::equivalence::project_orderings;
69use datafusion_physical_plan::coop::cooperative;
70use datafusion_physical_plan::execution_plan::SchedulingType;
71use log::{debug, warn};
72
73/// The base configurations for a [`DataSourceExec`], the a physical plan for
74/// any given file format.
75///
76/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
77///
78/// # Example
79/// ```
80/// # use std::any::Any;
81/// # use std::sync::Arc;
82/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
83/// # use object_store::ObjectStore;
84/// # use datafusion_common::Statistics;
85/// # use datafusion_common::Result;
86/// # use datafusion_datasource::file::FileSource;
87/// # use datafusion_datasource::file_groups::FileGroup;
88/// # use datafusion_datasource::PartitionedFile;
89/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
90/// # use datafusion_datasource::file_stream::FileOpener;
91/// # use datafusion_datasource::source::DataSourceExec;
92/// # use datafusion_datasource::table_schema::TableSchema;
93/// # use datafusion_execution::object_store::ObjectStoreUrl;
94/// # use datafusion_physical_plan::ExecutionPlan;
95/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
96/// # use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
97/// # let file_schema = Arc::new(Schema::new(vec![
98/// #  Field::new("c1", DataType::Int32, false),
99/// #  Field::new("c2", DataType::Int32, false),
100/// #  Field::new("c3", DataType::Int32, false),
101/// #  Field::new("c4", DataType::Int32, false),
102/// # ]));
103/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
104/// #[derive(Clone)]
105/// # struct ParquetSource {
106/// #    projected_statistics: Option<Statistics>,
107/// #    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>
108/// # };
109/// # impl FileSource for ParquetSource {
110/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
111/// #  fn as_any(&self) -> &dyn Any { self  }
112/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
113/// #  fn with_schema(&self, _: TableSchema) -> Arc<dyn FileSource> { Arc::new(self.clone()) as Arc<dyn FileSource> }
114/// #  fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
115/// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) }
116/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
117/// #  fn statistics(&self) -> Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) }
118/// #  fn file_type(&self) -> &str { "parquet" }
119/// #  fn with_schema_adapter_factory(&self, factory: Arc<dyn SchemaAdapterFactory>) -> Result<Arc<dyn FileSource>> { Ok(Arc::new(Self {projected_statistics: self.projected_statistics.clone(), schema_adapter_factory: Some(factory)} )) }
120/// #  fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { self.schema_adapter_factory.clone() }
121/// #  }
122/// # impl ParquetSource {
123/// #  fn new() -> Self { Self {projected_statistics: None, schema_adapter_factory: None} }
124/// # }
125/// // create FileScan config for reading parquet files from file://
126/// let object_store_url = ObjectStoreUrl::local_filesystem();
127/// let file_source = Arc::new(ParquetSource::new());
128/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
129///   .with_limit(Some(1000))            // read only the first 1000 records
130///   .with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3
131///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
132///   .with_file(PartitionedFile::new("file1.parquet", 1234))
133///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
134///   // in a  single row group
135///   .with_file_group(FileGroup::new(vec![
136///    PartitionedFile::new("file2.parquet", 56),
137///    PartitionedFile::new("file3.parquet", 78),
138///   ])).build();
139/// // create an execution plan from the config
140/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
141/// ```
142///
143/// [`DataSourceExec`]: crate::source::DataSourceExec
144/// [`DataSourceExec::from_data_source`]: crate::source::DataSourceExec::from_data_source
145#[derive(Clone)]
146pub struct FileScanConfig {
147    /// Object store URL, used to get an [`ObjectStore`] instance from
148    /// [`RuntimeEnv::object_store`]
149    ///
150    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
151    /// as `file://` or `s3://my_bucket`. It should not include the path to the
152    /// file itself. The relevant URL prefix must be registered via
153    /// [`RuntimeEnv::register_object_store`]
154    ///
155    /// [`ObjectStore`]: object_store::ObjectStore
156    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
157    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
158    pub object_store_url: ObjectStoreUrl,
159    /// Schema information including the file schema, table partition columns,
160    /// and the combined table schema.
161    ///
162    /// The table schema (file schema + partition columns) is the schema exposed
163    /// upstream of [`FileScanConfig`] (e.g. in [`DataSourceExec`]).
164    ///
165    /// See [`TableSchema`] for more information.
166    ///
167    /// [`DataSourceExec`]: crate::source::DataSourceExec
168    pub table_schema: TableSchema,
169    /// List of files to be processed, grouped into partitions
170    ///
171    /// Each file must have a schema of `file_schema` or a subset. If
172    /// a particular file has a subset, the missing columns are
173    /// padded with NULLs.
174    ///
175    /// DataFusion may attempt to read each partition of files
176    /// concurrently, however files *within* a partition will be read
177    /// sequentially, one after the next.
178    pub file_groups: Vec<FileGroup>,
179    /// Table constraints
180    pub constraints: Constraints,
181    /// Physical expressions defining the projection to apply when reading data.
182    ///
183    /// Each expression in the projection can reference columns from both the file
184    /// schema and table partition columns. If `None`, all columns from the table
185    /// schema are projected.
186    pub projection_exprs: Option<ProjectionExprs>,
187    /// The maximum number of records to read from this plan. If `None`,
188    /// all records after filtering are returned.
189    pub limit: Option<usize>,
190    /// All equivalent lexicographical orderings that describe the schema.
191    pub output_ordering: Vec<LexOrdering>,
192    /// File compression type
193    pub file_compression_type: FileCompressionType,
194    /// Are new lines in values supported for CSVOptions
195    pub new_lines_in_values: bool,
196    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
197    pub file_source: Arc<dyn FileSource>,
198    /// Batch size while creating new batches
199    /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
200    pub batch_size: Option<usize>,
201    /// Expression adapter used to adapt filters and projections that are pushed down into the scan
202    /// from the logical schema to the physical schema of the file.
203    pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
204}
205
206/// A builder for [`FileScanConfig`]'s.
207///
208/// Example:
209///
210/// ```rust
211/// # use std::sync::Arc;
212/// # use arrow::datatypes::{DataType, Field, Schema};
213/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig};
214/// # use datafusion_datasource::file_compression_type::FileCompressionType;
215/// # use datafusion_datasource::file_groups::FileGroup;
216/// # use datafusion_datasource::PartitionedFile;
217/// # use datafusion_execution::object_store::ObjectStoreUrl;
218/// # use datafusion_common::Statistics;
219/// # use datafusion_datasource::file::FileSource;
220///
221/// # fn main() {
222/// # fn with_source(file_source: Arc<dyn FileSource>) {
223///     // Create a schema for our Parquet files
224///     let schema = Arc::new(Schema::new(vec![
225///         Field::new("id", DataType::Int32, false),
226///         Field::new("value", DataType::Utf8, false),
227///     ]));
228///
229///     // Create a builder for scanning Parquet files from a local filesystem
230///     let config = FileScanConfigBuilder::new(
231///         ObjectStoreUrl::local_filesystem(),
232///         schema,
233///         file_source,
234///     )
235///     // Set a limit of 1000 rows
236///     .with_limit(Some(1000))
237///     // Project only the first column
238///     .with_projection_indices(Some(vec![0]))
239///     // Add partition columns
240///     .with_table_partition_cols(vec![
241///         Field::new("date", DataType::Utf8, false),
242///     ])
243///     // Add a file group with two files
244///     .with_file_group(FileGroup::new(vec![
245///         PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024),
246///         PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048),
247///     ]))
248///     // Set compression type
249///     .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
250///     // Build the final config
251///     .build();
252/// # }
253/// # }
254/// ```
255#[derive(Clone)]
256pub struct FileScanConfigBuilder {
257    object_store_url: ObjectStoreUrl,
258    /// Schema information including the file schema, table partition columns,
259    /// and the combined table schema.
260    ///
261    /// This schema is used to read the files, but the file schema is **not** necessarily
262    /// the schema of the physical files. Rather this is the schema that the
263    /// physical file schema will be mapped onto, and the schema that the
264    /// [`DataSourceExec`] will return.
265    ///
266    /// [`DataSourceExec`]: crate::source::DataSourceExec
267    table_schema: TableSchema,
268    file_source: Arc<dyn FileSource>,
269    limit: Option<usize>,
270    projection_indices: Option<Vec<usize>>,
271    constraints: Option<Constraints>,
272    file_groups: Vec<FileGroup>,
273    statistics: Option<Statistics>,
274    output_ordering: Vec<LexOrdering>,
275    file_compression_type: Option<FileCompressionType>,
276    new_lines_in_values: Option<bool>,
277    batch_size: Option<usize>,
278    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
279}
280
281impl FileScanConfigBuilder {
282    /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files.
283    ///
284    /// # Parameters:
285    /// * `object_store_url`: See [`FileScanConfig::object_store_url`]
286    /// * `file_schema`: See [`FileScanConfig::file_schema`]
287    /// * `file_source`: See [`FileScanConfig::file_source`]
288    pub fn new(
289        object_store_url: ObjectStoreUrl,
290        file_schema: SchemaRef,
291        file_source: Arc<dyn FileSource>,
292    ) -> Self {
293        Self {
294            object_store_url,
295            table_schema: TableSchema::from_file_schema(file_schema),
296            file_source,
297            file_groups: vec![],
298            statistics: None,
299            output_ordering: vec![],
300            file_compression_type: None,
301            new_lines_in_values: None,
302            limit: None,
303            projection_indices: None,
304            constraints: None,
305            batch_size: None,
306            expr_adapter_factory: None,
307        }
308    }
309
310    /// Set the maximum number of records to read from this plan. If `None`,
311    /// all records after filtering are returned.
312    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
313        self.limit = limit;
314        self
315    }
316
317    /// Set the file source for scanning files.
318    ///
319    /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
320    /// after the builder has been created.
321    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
322        self.file_source = file_source;
323        self
324    }
325
326    pub fn table_schema(&self) -> &SchemaRef {
327        self.table_schema.table_schema()
328    }
329
330    /// Set the columns on which to project the data. Indexes that are higher than the
331    /// number of columns of `file_schema` refer to `table_partition_cols`.
332    ///
333    /// # Deprecated
334    /// Use [`Self::with_projection_indices`] instead. This method will be removed in a future release.
335    #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
336    pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
337        self.with_projection_indices(indices)
338    }
339
340    /// Set the columns on which to project the data using column indices.
341    ///
342    /// Indexes that are higher than the number of columns of `file_schema` refer to `table_partition_cols`.
343    pub fn with_projection_indices(mut self, indices: Option<Vec<usize>>) -> Self {
344        self.projection_indices = indices;
345        self
346    }
347
348    /// Set the partitioning columns
349    pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
350        let table_partition_cols: Vec<FieldRef> = table_partition_cols
351            .into_iter()
352            .map(|f| Arc::new(f) as FieldRef)
353            .collect();
354        self.table_schema = self
355            .table_schema
356            .with_table_partition_cols(table_partition_cols);
357        self
358    }
359
360    /// Set the table constraints
361    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
362        self.constraints = Some(constraints);
363        self
364    }
365
366    /// Set the estimated overall statistics of the files, taking `filters` into account.
367    /// Defaults to [`Statistics::new_unknown`].
368    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
369        self.statistics = Some(statistics);
370        self
371    }
372
373    /// Set the list of files to be processed, grouped into partitions.
374    ///
375    /// Each file must have a schema of `file_schema` or a subset. If
376    /// a particular file has a subset, the missing columns are
377    /// padded with NULLs.
378    ///
379    /// DataFusion may attempt to read each partition of files
380    /// concurrently, however files *within* a partition will be read
381    /// sequentially, one after the next.
382    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
383        self.file_groups = file_groups;
384        self
385    }
386
387    /// Add a new file group
388    ///
389    /// See [`Self::with_file_groups`] for more information
390    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
391        self.file_groups.push(file_group);
392        self
393    }
394
395    /// Add a file as a single group
396    ///
397    /// See [`Self::with_file_groups`] for more information.
398    pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
399        self.with_file_group(FileGroup::new(vec![partitioned_file]))
400    }
401
402    /// Set the output ordering of the files
403    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
404        self.output_ordering = output_ordering;
405        self
406    }
407
408    /// Set the file compression type
409    pub fn with_file_compression_type(
410        mut self,
411        file_compression_type: FileCompressionType,
412    ) -> Self {
413        self.file_compression_type = Some(file_compression_type);
414        self
415    }
416
417    /// Set whether new lines in values are supported for CSVOptions
418    ///
419    /// Parsing newlines in quoted values may be affected by execution behaviour such as
420    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
421    /// parsed successfully, which may reduce performance.
422    pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
423        self.new_lines_in_values = Some(new_lines_in_values);
424        self
425    }
426
427    /// Set the batch_size property
428    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
429        self.batch_size = batch_size;
430        self
431    }
432
433    /// Register an expression adapter used to adapt filters and projections that are pushed down into the scan
434    /// from the logical schema to the physical schema of the file.
435    /// This can include things like:
436    /// - Column ordering changes
437    /// - Handling of missing columns
438    /// - Rewriting expression to use pre-computed values or file format specific optimizations
439    pub fn with_expr_adapter(
440        mut self,
441        expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
442    ) -> Self {
443        self.expr_adapter_factory = expr_adapter;
444        self
445    }
446
447    /// Build the final [`FileScanConfig`] with all the configured settings.
448    ///
449    /// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
450    /// Any unset optional fields will use their default values.
451    pub fn build(self) -> FileScanConfig {
452        let Self {
453            object_store_url,
454            table_schema,
455            file_source,
456            limit,
457            projection_indices,
458            constraints,
459            file_groups,
460            statistics,
461            output_ordering,
462            file_compression_type,
463            new_lines_in_values,
464            batch_size,
465            expr_adapter_factory: expr_adapter,
466        } = self;
467
468        let constraints = constraints.unwrap_or_default();
469        let statistics = statistics
470            .unwrap_or_else(|| Statistics::new_unknown(table_schema.file_schema()));
471
472        let file_source = file_source
473            .with_statistics(statistics.clone())
474            .with_schema(table_schema.clone());
475        let file_compression_type =
476            file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
477        let new_lines_in_values = new_lines_in_values.unwrap_or(false);
478
479        // Convert projection indices to ProjectionExprs using the final table schema
480        // (which now includes partition columns if they were added)
481        let projection_exprs = projection_indices.map(|indices| {
482            ProjectionExprs::from_indices(&indices, table_schema.table_schema())
483        });
484
485        FileScanConfig {
486            object_store_url,
487            table_schema,
488            file_source,
489            limit,
490            projection_exprs,
491            constraints,
492            file_groups,
493            output_ordering,
494            file_compression_type,
495            new_lines_in_values,
496            batch_size,
497            expr_adapter_factory: expr_adapter,
498        }
499    }
500}
501
502impl From<FileScanConfig> for FileScanConfigBuilder {
503    fn from(config: FileScanConfig) -> Self {
504        Self {
505            object_store_url: config.object_store_url,
506            table_schema: config.table_schema,
507            file_source: Arc::<dyn FileSource>::clone(&config.file_source),
508            file_groups: config.file_groups,
509            statistics: config.file_source.statistics().ok(),
510            output_ordering: config.output_ordering,
511            file_compression_type: Some(config.file_compression_type),
512            new_lines_in_values: Some(config.new_lines_in_values),
513            limit: config.limit,
514            projection_indices: config
515                .projection_exprs
516                .map(|p| p.ordered_column_indices()),
517            constraints: Some(config.constraints),
518            batch_size: config.batch_size,
519            expr_adapter_factory: config.expr_adapter_factory,
520        }
521    }
522}
523
524impl DataSource for FileScanConfig {
525    fn open(
526        &self,
527        partition: usize,
528        context: Arc<TaskContext>,
529    ) -> Result<SendableRecordBatchStream> {
530        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
531        let batch_size = self
532            .batch_size
533            .unwrap_or_else(|| context.session_config().batch_size());
534
535        let source = self
536            .file_source
537            .with_batch_size(batch_size)
538            .with_projection(self);
539
540        let opener = source.create_file_opener(object_store, self, partition);
541
542        let stream = FileStream::new(self, partition, opener, source.metrics())?;
543        Ok(Box::pin(cooperative(stream)))
544    }
545
546    fn as_any(&self) -> &dyn Any {
547        self
548    }
549
550    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
551        match t {
552            DisplayFormatType::Default | DisplayFormatType::Verbose => {
553                let schema = self.projected_schema();
554                let orderings = get_projected_output_ordering(self, &schema);
555
556                write!(f, "file_groups=")?;
557                FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
558
559                if !schema.fields().is_empty() {
560                    write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
561                }
562
563                if let Some(limit) = self.limit {
564                    write!(f, ", limit={limit}")?;
565                }
566
567                display_orderings(f, &orderings)?;
568
569                if !self.constraints.is_empty() {
570                    write!(f, ", {}", self.constraints)?;
571                }
572
573                self.fmt_file_source(t, f)
574            }
575            DisplayFormatType::TreeRender => {
576                writeln!(f, "format={}", self.file_source.file_type())?;
577                self.file_source.fmt_extra(t, f)?;
578                let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
579                writeln!(f, "files={num_files}")?;
580                Ok(())
581            }
582        }
583    }
584
585    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
586    fn repartitioned(
587        &self,
588        target_partitions: usize,
589        repartition_file_min_size: usize,
590        output_ordering: Option<LexOrdering>,
591    ) -> Result<Option<Arc<dyn DataSource>>> {
592        let source = self.file_source.repartitioned(
593            target_partitions,
594            repartition_file_min_size,
595            output_ordering,
596            self,
597        )?;
598
599        Ok(source.map(|s| Arc::new(s) as _))
600    }
601
602    fn output_partitioning(&self) -> Partitioning {
603        Partitioning::UnknownPartitioning(self.file_groups.len())
604    }
605
606    fn eq_properties(&self) -> EquivalenceProperties {
607        let (schema, constraints, _, orderings) = self.project();
608        let mut eq_properties =
609            EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings)
610                .with_constraints(constraints);
611        if let Some(filter) = self.file_source.filter() {
612            // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
613            // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
614            match Self::add_filter_equivalence_info(filter, &mut eq_properties, &schema) {
615                Ok(()) => {}
616                Err(e) => {
617                    warn!("Failed to add filter equivalence info: {e}");
618                    #[cfg(debug_assertions)]
619                    panic!("Failed to add filter equivalence info: {e}");
620                }
621            }
622        }
623        eq_properties
624    }
625
626    fn scheduling_type(&self) -> SchedulingType {
627        SchedulingType::Cooperative
628    }
629
630    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
631        if let Some(partition) = partition {
632            // Get statistics for a specific partition
633            if let Some(file_group) = self.file_groups.get(partition) {
634                if let Some(stat) = file_group.file_statistics(None) {
635                    // Project the statistics based on the projection
636                    let table_cols_stats = self
637                        .projection_indices()
638                        .into_iter()
639                        .map(|idx| {
640                            if idx < self.file_schema().fields().len() {
641                                stat.column_statistics[idx].clone()
642                            } else {
643                                // TODO provide accurate stat for partition column
644                                // See https://github.com/apache/datafusion/issues/1186
645                                ColumnStatistics::new_unknown()
646                            }
647                        })
648                        .collect();
649
650                    return Ok(Statistics {
651                        num_rows: stat.num_rows,
652                        total_byte_size: stat.total_byte_size,
653                        column_statistics: table_cols_stats,
654                    });
655                }
656            }
657            // If no statistics available for this partition, return unknown
658            Ok(Statistics::new_unknown(&self.projected_schema()))
659        } else {
660            // Return aggregate statistics across all partitions
661            Ok(self.projected_stats())
662        }
663    }
664
665    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
666        let source = FileScanConfigBuilder::from(self.clone())
667            .with_limit(limit)
668            .build();
669        Some(Arc::new(source))
670    }
671
672    fn fetch(&self) -> Option<usize> {
673        self.limit
674    }
675
676    fn metrics(&self) -> ExecutionPlanMetricsSet {
677        self.file_source.metrics().clone()
678    }
679
680    fn try_swapping_with_projection(
681        &self,
682        projection: &[ProjectionExpr],
683    ) -> Result<Option<Arc<dyn DataSource>>> {
684        // This process can be moved into CsvExec, but it would be an overlap of their responsibility.
685
686        // Must be all column references, with no table partition columns (which can not be projected)
687        let partitioned_columns_in_proj = projection.iter().any(|proj_expr| {
688            proj_expr
689                .expr
690                .as_any()
691                .downcast_ref::<Column>()
692                .map(|expr| expr.index() >= self.file_schema().fields().len())
693                .unwrap_or(false)
694        });
695
696        // If there is any non-column or alias-carrier expression, Projection should not be removed.
697        let no_aliases = all_alias_free_columns(projection);
698
699        Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
700            let file_scan = self.clone();
701            let source = Arc::clone(&file_scan.file_source);
702            let new_projections = new_projections_for_columns(
703                projection,
704                &file_scan
705                    .projection_exprs
706                    .as_ref()
707                    .map(|p| p.ordered_column_indices())
708                    .unwrap_or_else(|| (0..self.file_schema().fields().len()).collect()),
709            );
710
711            Arc::new(
712                FileScanConfigBuilder::from(file_scan)
713                    // Assign projected statistics to source
714                    .with_projection_indices(Some(new_projections))
715                    .with_source(source)
716                    .build(),
717            ) as _
718        }))
719    }
720
721    fn try_pushdown_filters(
722        &self,
723        filters: Vec<Arc<dyn PhysicalExpr>>,
724        config: &ConfigOptions,
725    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
726        let result = self.file_source.try_pushdown_filters(filters, config)?;
727        match result.updated_node {
728            Some(new_file_source) => {
729                let file_scan_config = FileScanConfigBuilder::from(self.clone())
730                    .with_source(new_file_source)
731                    .build();
732                Ok(FilterPushdownPropagation {
733                    filters: result.filters,
734                    updated_node: Some(Arc::new(file_scan_config) as _),
735                })
736            }
737            None => {
738                // If the file source does not support filter pushdown, return the original config
739                Ok(FilterPushdownPropagation {
740                    filters: result.filters,
741                    updated_node: None,
742                })
743            }
744        }
745    }
746}
747
748impl FileScanConfig {
749    /// Get the file schema (schema of the files without partition columns)
750    pub fn file_schema(&self) -> &SchemaRef {
751        self.table_schema.file_schema()
752    }
753
754    /// Get the table partition columns
755    pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
756        self.table_schema.table_partition_cols()
757    }
758
759    fn projection_indices(&self) -> Vec<usize> {
760        match &self.projection_exprs {
761            Some(proj) => proj.ordered_column_indices(),
762            None => (0..self.file_schema().fields().len()
763                + self.table_partition_cols().len())
764                .collect(),
765        }
766    }
767
768    pub fn projected_stats(&self) -> Statistics {
769        let statistics = self.file_source.statistics().unwrap();
770
771        let table_cols_stats = self
772            .projection_indices()
773            .into_iter()
774            .map(|idx| {
775                if idx < self.file_schema().fields().len() {
776                    statistics.column_statistics[idx].clone()
777                } else {
778                    // TODO provide accurate stat for partition column (#1186)
779                    ColumnStatistics::new_unknown()
780                }
781            })
782            .collect();
783
784        Statistics {
785            num_rows: statistics.num_rows,
786            // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
787            total_byte_size: statistics.total_byte_size,
788            column_statistics: table_cols_stats,
789        }
790    }
791
792    pub fn projected_schema(&self) -> Arc<Schema> {
793        let table_fields: Vec<_> = self
794            .projection_indices()
795            .into_iter()
796            .map(|idx| {
797                if idx < self.file_schema().fields().len() {
798                    self.file_schema().field(idx).clone()
799                } else {
800                    let partition_idx = idx - self.file_schema().fields().len();
801                    Arc::unwrap_or_clone(Arc::clone(
802                        &self.table_partition_cols()[partition_idx],
803                    ))
804                }
805            })
806            .collect();
807
808        Arc::new(Schema::new_with_metadata(
809            table_fields,
810            self.file_schema().metadata().clone(),
811        ))
812    }
813
814    fn add_filter_equivalence_info(
815        filter: Arc<dyn PhysicalExpr>,
816        eq_properties: &mut EquivalenceProperties,
817        schema: &Schema,
818    ) -> Result<()> {
819        // Gather valid equality pairs from the filter expression
820        let equal_pairs = split_conjunction(&filter).into_iter().filter_map(|expr| {
821            // Ignore any binary expressions that reference non-existent columns in the current schema
822            // (e.g. due to unnecessary projections being removed)
823            reassign_expr_columns(Arc::clone(expr), schema)
824                .ok()
825                .and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
826                    Some(expr) if expr.op() == &Operator::Eq => {
827                        Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
828                    }
829                    _ => None,
830                })
831        });
832
833        for (lhs, rhs) in equal_pairs {
834            eq_properties.add_equal_conditions(lhs, rhs)?
835        }
836
837        Ok(())
838    }
839
840    pub fn projected_constraints(&self) -> Constraints {
841        let indexes = self.projection_indices();
842        self.constraints.project(&indexes).unwrap_or_default()
843    }
844
845    /// Specifies whether newlines in (quoted) values are supported.
846    ///
847    /// Parsing newlines in quoted values may be affected by execution behaviour such as
848    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
849    /// parsed successfully, which may reduce performance.
850    ///
851    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
852    pub fn newlines_in_values(&self) -> bool {
853        self.new_lines_in_values
854    }
855
856    /// Project the schema, constraints, and the statistics on the given column indices
857    pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
858        if self.projection_exprs.is_none() && self.table_partition_cols().is_empty() {
859            return (
860                Arc::clone(self.file_schema()),
861                self.constraints.clone(),
862                self.file_source.statistics().unwrap().clone(),
863                self.output_ordering.clone(),
864            );
865        }
866
867        let schema = self.projected_schema();
868        let constraints = self.projected_constraints();
869        let stats = self.projected_stats();
870
871        let output_ordering = get_projected_output_ordering(self, &schema);
872
873        (schema, constraints, stats, output_ordering)
874    }
875
876    pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
877        let fields = self.file_schema().fields();
878
879        self.projection_exprs.as_ref().map(|p| {
880            let column_indices = p.ordered_column_indices();
881
882            column_indices
883                .iter()
884                .filter(|&&col_i| col_i < fields.len())
885                .map(|&col_i| self.file_schema().field(col_i).name())
886                .cloned()
887                .collect::<Vec<_>>()
888        })
889    }
890
891    /// Projects only file schema, ignoring partition columns
892    pub fn projected_file_schema(&self) -> SchemaRef {
893        let fields = self.file_column_projection_indices().map(|indices| {
894            indices
895                .iter()
896                .map(|col_idx| self.file_schema().field(*col_idx))
897                .cloned()
898                .collect::<Vec<_>>()
899        });
900
901        fields.map_or_else(
902            || Arc::clone(self.file_schema()),
903            |f| {
904                Arc::new(Schema::new_with_metadata(
905                    f,
906                    self.file_schema().metadata.clone(),
907                ))
908            },
909        )
910    }
911
912    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
913        self.projection_exprs.as_ref().map(|p| {
914            p.ordered_column_indices()
915                .into_iter()
916                .filter(|&i| i < self.file_schema().fields().len())
917                .collect::<Vec<_>>()
918        })
919    }
920
921    /// Splits file groups into new groups based on statistics to enable efficient parallel processing.
922    ///
923    /// The method distributes files across a target number of partitions while ensuring
924    /// files within each partition maintain sort order based on their min/max statistics.
925    ///
926    /// The algorithm works by:
927    /// 1. Takes files sorted by minimum values
928    /// 2. For each file:
929    ///   - Finds eligible groups (empty or where file's min > group's last max)
930    ///   - Selects the smallest eligible group
931    ///   - Creates a new group if needed
932    ///
933    /// # Parameters
934    /// * `table_schema`: Schema containing information about the columns
935    /// * `file_groups`: The original file groups to split
936    /// * `sort_order`: The lexicographical ordering to maintain within each group
937    /// * `target_partitions`: The desired number of output partitions
938    ///
939    /// # Returns
940    /// A new set of file groups, where files within each group are non-overlapping with respect to
941    /// their min/max statistics and maintain the specified sort order.
942    pub fn split_groups_by_statistics_with_target_partitions(
943        table_schema: &SchemaRef,
944        file_groups: &[FileGroup],
945        sort_order: &LexOrdering,
946        target_partitions: usize,
947    ) -> Result<Vec<FileGroup>> {
948        if target_partitions == 0 {
949            return Err(internal_datafusion_err!(
950                "target_partitions must be greater than 0"
951            ));
952        }
953
954        let flattened_files = file_groups
955            .iter()
956            .flat_map(FileGroup::iter)
957            .collect::<Vec<_>>();
958
959        if flattened_files.is_empty() {
960            return Ok(vec![]);
961        }
962
963        let statistics = MinMaxStatistics::new_from_files(
964            sort_order,
965            table_schema,
966            None,
967            flattened_files.iter().copied(),
968        )?;
969
970        let indices_sorted_by_min = statistics.min_values_sorted();
971
972        // Initialize with target_partitions empty groups
973        let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
974
975        for (idx, min) in indices_sorted_by_min {
976            if let Some((_, group)) = file_groups_indices
977                .iter_mut()
978                .enumerate()
979                .filter(|(_, group)| {
980                    group.is_empty()
981                        || min
982                            > statistics
983                                .max(*group.last().expect("groups should not be empty"))
984                })
985                .min_by_key(|(_, group)| group.len())
986            {
987                group.push(idx);
988            } else {
989                // Create a new group if no existing group fits
990                file_groups_indices.push(vec![idx]);
991            }
992        }
993
994        // Remove any empty groups
995        file_groups_indices.retain(|group| !group.is_empty());
996
997        // Assemble indices back into groups of PartitionedFiles
998        Ok(file_groups_indices
999            .into_iter()
1000            .map(|file_group_indices| {
1001                FileGroup::new(
1002                    file_group_indices
1003                        .into_iter()
1004                        .map(|idx| flattened_files[idx].clone())
1005                        .collect(),
1006                )
1007            })
1008            .collect())
1009    }
1010
1011    /// Attempts to do a bin-packing on files into file groups, such that any two files
1012    /// in a file group are ordered and non-overlapping with respect to their statistics.
1013    /// It will produce the smallest number of file groups possible.
1014    pub fn split_groups_by_statistics(
1015        table_schema: &SchemaRef,
1016        file_groups: &[FileGroup],
1017        sort_order: &LexOrdering,
1018    ) -> Result<Vec<FileGroup>> {
1019        let flattened_files = file_groups
1020            .iter()
1021            .flat_map(FileGroup::iter)
1022            .collect::<Vec<_>>();
1023        // First Fit:
1024        // * Choose the first file group that a file can be placed into.
1025        // * If it fits into no existing file groups, create a new one.
1026        //
1027        // By sorting files by min values and then applying first-fit bin packing,
1028        // we can produce the smallest number of file groups such that
1029        // files within a group are in order and non-overlapping.
1030        //
1031        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
1032        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
1033
1034        if flattened_files.is_empty() {
1035            return Ok(vec![]);
1036        }
1037
1038        let statistics = MinMaxStatistics::new_from_files(
1039            sort_order,
1040            table_schema,
1041            None,
1042            flattened_files.iter().copied(),
1043        )
1044        .map_err(|e| {
1045            e.context("construct min/max statistics for split_groups_by_statistics")
1046        })?;
1047
1048        let indices_sorted_by_min = statistics.min_values_sorted();
1049        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1050
1051        for (idx, min) in indices_sorted_by_min {
1052            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1053                // If our file is non-overlapping and comes _after_ the last file,
1054                // it fits in this file group.
1055                min > statistics.max(
1056                    *group
1057                        .last()
1058                        .expect("groups should be nonempty at construction"),
1059                )
1060            });
1061            match file_group_to_insert {
1062                Some(group) => group.push(idx),
1063                None => file_groups_indices.push(vec![idx]),
1064            }
1065        }
1066
1067        // Assemble indices back into groups of PartitionedFiles
1068        Ok(file_groups_indices
1069            .into_iter()
1070            .map(|file_group_indices| {
1071                file_group_indices
1072                    .into_iter()
1073                    .map(|idx| flattened_files[idx].clone())
1074                    .collect()
1075            })
1076            .collect())
1077    }
1078
1079    /// Write the data_type based on file_source
1080    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1081        write!(f, ", file_type={}", self.file_source.file_type())?;
1082        self.file_source.fmt_extra(t, f)
1083    }
1084
1085    /// Returns the file_source
1086    pub fn file_source(&self) -> &Arc<dyn FileSource> {
1087        &self.file_source
1088    }
1089}
1090
1091impl Debug for FileScanConfig {
1092    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1093        write!(f, "FileScanConfig {{")?;
1094        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1095
1096        write!(
1097            f,
1098            "statistics={:?}, ",
1099            self.file_source.statistics().unwrap()
1100        )?;
1101
1102        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1103        write!(f, "}}")
1104    }
1105}
1106
1107impl DisplayAs for FileScanConfig {
1108    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1109        let schema = self.projected_schema();
1110        let orderings = get_projected_output_ordering(self, &schema);
1111
1112        write!(f, "file_groups=")?;
1113        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1114
1115        if !schema.fields().is_empty() {
1116            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1117        }
1118
1119        if let Some(limit) = self.limit {
1120            write!(f, ", limit={limit}")?;
1121        }
1122
1123        display_orderings(f, &orderings)?;
1124
1125        if !self.constraints.is_empty() {
1126            write!(f, ", {}", self.constraints)?;
1127        }
1128
1129        Ok(())
1130    }
1131}
1132
1133/// A helper that projects partition columns into the file record batches.
1134///
1135/// One interesting trick is the usage of a cache for the key buffers of the partition column
1136/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
1137/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
1138/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
1139pub struct PartitionColumnProjector {
1140    /// An Arrow buffer initialized to zeros that represents the key array of all partition
1141    /// columns (partition columns are materialized by dictionary arrays with only one
1142    /// value in the dictionary, thus all the keys are equal to zero).
1143    key_buffer_cache: ZeroBufferGenerators,
1144    /// Mapping between the indexes in the list of partition columns and the target
1145    /// schema. Sorted by index in the target schema so that we can iterate on it to
1146    /// insert the partition columns in the target record batch.
1147    projected_partition_indexes: Vec<(usize, usize)>,
1148    /// The schema of the table once the projection was applied.
1149    projected_schema: SchemaRef,
1150}
1151
1152impl PartitionColumnProjector {
1153    // Create a projector to insert the partitioning columns into batches read from files
1154    // - `projected_schema`: the target schema with both file and partitioning columns
1155    // - `table_partition_cols`: all the partitioning column names
1156    pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1157        let mut idx_map = HashMap::new();
1158        for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1159            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1160                idx_map.insert(partition_idx, schema_idx);
1161            }
1162        }
1163
1164        let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1165        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1166
1167        Self {
1168            projected_partition_indexes,
1169            key_buffer_cache: Default::default(),
1170            projected_schema,
1171        }
1172    }
1173
1174    // Transform the batch read from the file by inserting the partitioning columns
1175    // to the right positions as deduced from `projected_schema`
1176    // - `file_batch`: batch read from the file, with internal projection applied
1177    // - `partition_values`: the list of partition values, one for each partition column
1178    pub fn project(
1179        &mut self,
1180        file_batch: RecordBatch,
1181        partition_values: &[ScalarValue],
1182    ) -> Result<RecordBatch> {
1183        let expected_cols =
1184            self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1185
1186        if file_batch.columns().len() != expected_cols {
1187            return exec_err!(
1188                "Unexpected batch schema from file, expected {} cols but got {}",
1189                expected_cols,
1190                file_batch.columns().len()
1191            );
1192        }
1193
1194        let mut cols = file_batch.columns().to_vec();
1195        for &(pidx, sidx) in &self.projected_partition_indexes {
1196            let p_value = partition_values.get(pidx).ok_or_else(|| {
1197                exec_datafusion_err!("Invalid partitioning found on disk")
1198            })?;
1199
1200            let mut partition_value = Cow::Borrowed(p_value);
1201
1202            // check if user forgot to dict-encode the partition value
1203            let field = self.projected_schema.field(sidx);
1204            let expected_data_type = field.data_type();
1205            let actual_data_type = partition_value.data_type();
1206            if let DataType::Dictionary(key_type, _) = expected_data_type {
1207                if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1208                    warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1209                    partition_value = Cow::Owned(ScalarValue::Dictionary(
1210                        key_type.clone(),
1211                        Box::new(partition_value.as_ref().clone()),
1212                    ));
1213                }
1214            }
1215
1216            cols.insert(
1217                sidx,
1218                create_output_array(
1219                    &mut self.key_buffer_cache,
1220                    partition_value.as_ref(),
1221                    file_batch.num_rows(),
1222                )?,
1223            )
1224        }
1225
1226        RecordBatch::try_new_with_options(
1227            Arc::clone(&self.projected_schema),
1228            cols,
1229            &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1230        )
1231        .map_err(Into::into)
1232    }
1233}
1234
1235#[derive(Debug, Default)]
1236struct ZeroBufferGenerators {
1237    gen_i8: ZeroBufferGenerator<i8>,
1238    gen_i16: ZeroBufferGenerator<i16>,
1239    gen_i32: ZeroBufferGenerator<i32>,
1240    gen_i64: ZeroBufferGenerator<i64>,
1241    gen_u8: ZeroBufferGenerator<u8>,
1242    gen_u16: ZeroBufferGenerator<u16>,
1243    gen_u32: ZeroBufferGenerator<u32>,
1244    gen_u64: ZeroBufferGenerator<u64>,
1245}
1246
1247/// Generate a arrow [`Buffer`] that contains zero values.
1248#[derive(Debug, Default)]
1249struct ZeroBufferGenerator<T>
1250where
1251    T: ArrowNativeType,
1252{
1253    cache: Option<Buffer>,
1254    _t: PhantomData<T>,
1255}
1256
1257impl<T> ZeroBufferGenerator<T>
1258where
1259    T: ArrowNativeType,
1260{
1261    const SIZE: usize = size_of::<T>();
1262
1263    fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1264        match &mut self.cache {
1265            Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1266                buf.slice_with_length(0, n_vals * Self::SIZE)
1267            }
1268            _ => {
1269                let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1270                key_buffer_builder.advance(n_vals); // keys are all 0
1271                self.cache.insert(key_buffer_builder.finish()).clone()
1272            }
1273        }
1274    }
1275}
1276
1277fn create_dict_array<T>(
1278    buffer_gen: &mut ZeroBufferGenerator<T>,
1279    dict_val: &ScalarValue,
1280    len: usize,
1281    data_type: DataType,
1282) -> Result<ArrayRef>
1283where
1284    T: ArrowNativeType,
1285{
1286    let dict_vals = dict_val.to_array()?;
1287
1288    let sliced_key_buffer = buffer_gen.get_buffer(len);
1289
1290    // assemble pieces together
1291    let mut builder = ArrayData::builder(data_type)
1292        .len(len)
1293        .add_buffer(sliced_key_buffer);
1294    builder = builder.add_child_data(dict_vals.to_data());
1295    Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1296        builder.build().unwrap(),
1297    )))
1298}
1299
1300fn create_output_array(
1301    key_buffer_cache: &mut ZeroBufferGenerators,
1302    val: &ScalarValue,
1303    len: usize,
1304) -> Result<ArrayRef> {
1305    if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1306        match key_type.as_ref() {
1307            DataType::Int8 => {
1308                return create_dict_array(
1309                    &mut key_buffer_cache.gen_i8,
1310                    dict_val,
1311                    len,
1312                    val.data_type(),
1313                );
1314            }
1315            DataType::Int16 => {
1316                return create_dict_array(
1317                    &mut key_buffer_cache.gen_i16,
1318                    dict_val,
1319                    len,
1320                    val.data_type(),
1321                );
1322            }
1323            DataType::Int32 => {
1324                return create_dict_array(
1325                    &mut key_buffer_cache.gen_i32,
1326                    dict_val,
1327                    len,
1328                    val.data_type(),
1329                );
1330            }
1331            DataType::Int64 => {
1332                return create_dict_array(
1333                    &mut key_buffer_cache.gen_i64,
1334                    dict_val,
1335                    len,
1336                    val.data_type(),
1337                );
1338            }
1339            DataType::UInt8 => {
1340                return create_dict_array(
1341                    &mut key_buffer_cache.gen_u8,
1342                    dict_val,
1343                    len,
1344                    val.data_type(),
1345                );
1346            }
1347            DataType::UInt16 => {
1348                return create_dict_array(
1349                    &mut key_buffer_cache.gen_u16,
1350                    dict_val,
1351                    len,
1352                    val.data_type(),
1353                );
1354            }
1355            DataType::UInt32 => {
1356                return create_dict_array(
1357                    &mut key_buffer_cache.gen_u32,
1358                    dict_val,
1359                    len,
1360                    val.data_type(),
1361                );
1362            }
1363            DataType::UInt64 => {
1364                return create_dict_array(
1365                    &mut key_buffer_cache.gen_u64,
1366                    dict_val,
1367                    len,
1368                    val.data_type(),
1369                );
1370            }
1371            _ => {}
1372        }
1373    }
1374
1375    val.to_array_of_size(len)
1376}
1377
1378/// The various listing tables does not attempt to read all files
1379/// concurrently, instead they will read files in sequence within a
1380/// partition.  This is an important property as it allows plans to
1381/// run against 1000s of files and not try to open them all
1382/// concurrently.
1383///
1384/// However, it means if we assign more than one file to a partition
1385/// the output sort order will not be preserved as illustrated in the
1386/// following diagrams:
1387///
1388/// When only 1 file is assigned to each partition, each partition is
1389/// correctly sorted on `(A, B, C)`
1390///
1391/// ```text
1392/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
1393///   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
1394/// ┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
1395///   │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
1396/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
1397///   │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
1398/// ┃                                          │                    │                     ┃
1399///   │                   │ │                    │                    │                 │
1400/// ┃                                          │                    │                     ┃
1401///   │                   │ │                    │                    │                 │
1402/// ┃                                          │                    │                     ┃
1403///   │                   │ │                    │                    │                 │
1404/// ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1405///      DataFusion           DataFusion           DataFusion           DataFusion
1406/// ┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
1407///  ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1408///
1409///                                      DataSourceExec
1410/// ```
1411///
1412/// However, when more than 1 file is assigned to each partition, each
1413/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
1414/// file is scanned, the same values for A, B and C can be repeated in
1415/// the same sorted stream
1416///
1417///```text
1418/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1419///   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1420/// ┃   ┌───────────────┐     ┌──────────────┐ │
1421///   │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
1422/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1423///   │ └───────────────┘ │ │ └──────────────┘   ┃
1424/// ┃   ┌───────────────┐     ┌──────────────┐ │
1425///   │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
1426/// ┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1427///   │ └───────────────┘ │ │ └──────────────┘   ┃
1428/// ┃                                          │
1429///   │                   │ │                    ┃
1430/// ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
1431///      DataFusion           DataFusion         ┃
1432/// ┃    Partition 1          Partition 2
1433///  ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
1434///
1435///              DataSourceExec
1436/// ```
1437fn get_projected_output_ordering(
1438    base_config: &FileScanConfig,
1439    projected_schema: &SchemaRef,
1440) -> Vec<LexOrdering> {
1441    let projected_orderings =
1442        project_orderings(&base_config.output_ordering, projected_schema);
1443
1444    let mut all_orderings = vec![];
1445    for new_ordering in projected_orderings {
1446        // Check if any file groups are not sorted
1447        if base_config.file_groups.iter().any(|group| {
1448            if group.len() <= 1 {
1449                // File groups with <= 1 files are always sorted
1450                return false;
1451            }
1452
1453            let indices = base_config
1454                .projection_exprs
1455                .as_ref()
1456                .map(|p| p.ordered_column_indices());
1457
1458            let statistics = match MinMaxStatistics::new_from_files(
1459                &new_ordering,
1460                projected_schema,
1461                indices.as_deref(),
1462                group.iter(),
1463            ) {
1464                Ok(statistics) => statistics,
1465                Err(e) => {
1466                    log::trace!("Error fetching statistics for file group: {e}");
1467                    // we can't prove that it's ordered, so we have to reject it
1468                    return true;
1469                }
1470            };
1471
1472            !statistics.is_sorted()
1473        }) {
1474            debug!(
1475                "Skipping specified output ordering {:?}. \
1476                Some file groups couldn't be determined to be sorted: {:?}",
1477                base_config.output_ordering[0], base_config.file_groups
1478            );
1479            continue;
1480        }
1481
1482        all_orderings.push(new_ordering);
1483    }
1484    all_orderings
1485}
1486
1487/// Convert type to a type suitable for use as a `ListingTable`
1488/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1489/// a reasonable trade off between a reasonable number of partition
1490/// values and space efficiency.
1491///
1492/// This use this to specify types for partition columns. However
1493/// you MAY also choose not to dictionary-encode the data or to use a
1494/// different dictionary type.
1495///
1496/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1497pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1498    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1499}
1500
1501/// Convert a [`ScalarValue`] of partition columns to a type, as
1502/// described in the documentation of [`wrap_partition_type_in_dict`],
1503/// which can wrap the types.
1504pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1505    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510    use super::*;
1511    use crate::test_util::col;
1512    use crate::{
1513        generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1514        verify_sort_integrity,
1515    };
1516
1517    use arrow::array::{Int32Array, RecordBatch};
1518    use datafusion_common::stats::Precision;
1519    use datafusion_common::{assert_batches_eq, internal_err};
1520    use datafusion_expr::{Operator, SortExpr};
1521    use datafusion_physical_expr::create_physical_sort_expr;
1522    use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
1523    use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
1524
1525    /// Returns the column names on the schema
1526    pub fn columns(schema: &Schema) -> Vec<String> {
1527        schema.fields().iter().map(|f| f.name().clone()).collect()
1528    }
1529
1530    #[test]
1531    fn physical_plan_config_no_projection() {
1532        let file_schema = aggr_test_schema();
1533        let conf = config_for_projection(
1534            Arc::clone(&file_schema),
1535            None,
1536            Statistics::new_unknown(&file_schema),
1537            to_partition_cols(vec![(
1538                "date".to_owned(),
1539                wrap_partition_type_in_dict(DataType::Utf8),
1540            )]),
1541        );
1542
1543        let (proj_schema, _, proj_statistics, _) = conf.project();
1544        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1545        assert_eq!(
1546            proj_schema.field(file_schema.fields().len()).name(),
1547            "date",
1548            "partition columns are the last columns"
1549        );
1550        assert_eq!(
1551            proj_statistics.column_statistics.len(),
1552            file_schema.fields().len() + 1
1553        );
1554        // TODO implement tests for partition column statistics once implemented
1555
1556        let col_names = conf.projected_file_column_names();
1557        assert_eq!(col_names, None);
1558
1559        let col_indices = conf.file_column_projection_indices();
1560        assert_eq!(col_indices, None);
1561    }
1562
1563    #[test]
1564    fn physical_plan_config_no_projection_tab_cols_as_field() {
1565        let file_schema = aggr_test_schema();
1566
1567        // make a table_partition_col as a field
1568        let table_partition_col =
1569            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1570                .with_metadata(HashMap::from_iter(vec![(
1571                    "key_whatever".to_owned(),
1572                    "value_whatever".to_owned(),
1573                )]));
1574
1575        let conf = config_for_projection(
1576            Arc::clone(&file_schema),
1577            None,
1578            Statistics::new_unknown(&file_schema),
1579            vec![table_partition_col.clone()],
1580        );
1581
1582        // verify the proj_schema includes the last column and exactly the same the field it is defined
1583        let proj_schema = conf.projected_schema();
1584        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1585        assert_eq!(
1586            *proj_schema.field(file_schema.fields().len()),
1587            table_partition_col,
1588            "partition columns are the last columns and ust have all values defined in created field"
1589        );
1590    }
1591
1592    #[test]
1593    fn physical_plan_config_with_projection() {
1594        let file_schema = aggr_test_schema();
1595        let conf = config_for_projection(
1596            Arc::clone(&file_schema),
1597            Some(vec![file_schema.fields().len(), 0]),
1598            Statistics {
1599                num_rows: Precision::Inexact(10),
1600                // assign the column index to distinct_count to help assert
1601                // the source statistic after the projection
1602                column_statistics: (0..file_schema.fields().len())
1603                    .map(|i| ColumnStatistics {
1604                        distinct_count: Precision::Inexact(i),
1605                        ..Default::default()
1606                    })
1607                    .collect(),
1608                total_byte_size: Precision::Absent,
1609            },
1610            to_partition_cols(vec![(
1611                "date".to_owned(),
1612                wrap_partition_type_in_dict(DataType::Utf8),
1613            )]),
1614        );
1615
1616        let (proj_schema, _, proj_statistics, _) = conf.project();
1617        assert_eq!(
1618            columns(&proj_schema),
1619            vec!["date".to_owned(), "c1".to_owned()]
1620        );
1621        let proj_stat_cols = proj_statistics.column_statistics;
1622        assert_eq!(proj_stat_cols.len(), 2);
1623        // TODO implement tests for proj_stat_cols[0] once partition column
1624        // statistics are implemented
1625        assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1626
1627        let col_names = conf.projected_file_column_names();
1628        assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1629
1630        let col_indices = conf.file_column_projection_indices();
1631        assert_eq!(col_indices, Some(vec![0]));
1632    }
1633
1634    #[test]
1635    fn partition_column_projector() {
1636        let file_batch = build_table_i32(
1637            ("a", &vec![0, 1, 2]),
1638            ("b", &vec![-2, -1, 0]),
1639            ("c", &vec![10, 11, 12]),
1640        );
1641        let partition_cols = vec![
1642            (
1643                "year".to_owned(),
1644                wrap_partition_type_in_dict(DataType::Utf8),
1645            ),
1646            (
1647                "month".to_owned(),
1648                wrap_partition_type_in_dict(DataType::Utf8),
1649            ),
1650            (
1651                "day".to_owned(),
1652                wrap_partition_type_in_dict(DataType::Utf8),
1653            ),
1654        ];
1655        // create a projected schema
1656        let statistics = Statistics {
1657            num_rows: Precision::Inexact(3),
1658            total_byte_size: Precision::Absent,
1659            column_statistics: Statistics::unknown_column(&file_batch.schema()),
1660        };
1661
1662        let conf = config_for_projection(
1663            file_batch.schema(),
1664            // keep all cols from file and 2 from partitioning
1665            Some(vec![
1666                0,
1667                1,
1668                2,
1669                file_batch.schema().fields().len(),
1670                file_batch.schema().fields().len() + 2,
1671            ]),
1672            statistics.clone(),
1673            to_partition_cols(partition_cols.clone()),
1674        );
1675
1676        let source_statistics = conf.file_source.statistics().unwrap();
1677        let conf_stats = conf.partition_statistics(None).unwrap();
1678
1679        // projection should be reflected in the file source statistics
1680        assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1681
1682        // 3 original statistics + 2 partition statistics
1683        assert_eq!(conf_stats.column_statistics.len(), 5);
1684
1685        // file statics should not be modified
1686        assert_eq!(source_statistics, statistics);
1687        assert_eq!(source_statistics.column_statistics.len(), 3);
1688
1689        let proj_schema = conf.projected_schema();
1690        // created a projector for that projected schema
1691        let mut proj = PartitionColumnProjector::new(
1692            proj_schema,
1693            &partition_cols
1694                .iter()
1695                .map(|x| x.0.clone())
1696                .collect::<Vec<_>>(),
1697        );
1698
1699        // project first batch
1700        let projected_batch = proj
1701            .project(
1702                // file_batch is ok here because we kept all the file cols in the projection
1703                file_batch,
1704                &[
1705                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1706                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1707                    wrap_partition_value_in_dict(ScalarValue::from("26")),
1708                ],
1709            )
1710            .expect("Projection of partition columns into record batch failed");
1711        let expected = [
1712            "+---+----+----+------+-----+",
1713            "| a | b  | c  | year | day |",
1714            "+---+----+----+------+-----+",
1715            "| 0 | -2 | 10 | 2021 | 26  |",
1716            "| 1 | -1 | 11 | 2021 | 26  |",
1717            "| 2 | 0  | 12 | 2021 | 26  |",
1718            "+---+----+----+------+-----+",
1719        ];
1720        assert_batches_eq!(expected, &[projected_batch]);
1721
1722        // project another batch that is larger than the previous one
1723        let file_batch = build_table_i32(
1724            ("a", &vec![5, 6, 7, 8, 9]),
1725            ("b", &vec![-10, -9, -8, -7, -6]),
1726            ("c", &vec![12, 13, 14, 15, 16]),
1727        );
1728        let projected_batch = proj
1729            .project(
1730                // file_batch is ok here because we kept all the file cols in the projection
1731                file_batch,
1732                &[
1733                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1734                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1735                    wrap_partition_value_in_dict(ScalarValue::from("27")),
1736                ],
1737            )
1738            .expect("Projection of partition columns into record batch failed");
1739        let expected = [
1740            "+---+-----+----+------+-----+",
1741            "| a | b   | c  | year | day |",
1742            "+---+-----+----+------+-----+",
1743            "| 5 | -10 | 12 | 2021 | 27  |",
1744            "| 6 | -9  | 13 | 2021 | 27  |",
1745            "| 7 | -8  | 14 | 2021 | 27  |",
1746            "| 8 | -7  | 15 | 2021 | 27  |",
1747            "| 9 | -6  | 16 | 2021 | 27  |",
1748            "+---+-----+----+------+-----+",
1749        ];
1750        assert_batches_eq!(expected, &[projected_batch]);
1751
1752        // project another batch that is smaller than the previous one
1753        let file_batch = build_table_i32(
1754            ("a", &vec![0, 1, 3]),
1755            ("b", &vec![2, 3, 4]),
1756            ("c", &vec![4, 5, 6]),
1757        );
1758        let projected_batch = proj
1759            .project(
1760                // file_batch is ok here because we kept all the file cols in the projection
1761                file_batch,
1762                &[
1763                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1764                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1765                    wrap_partition_value_in_dict(ScalarValue::from("28")),
1766                ],
1767            )
1768            .expect("Projection of partition columns into record batch failed");
1769        let expected = [
1770            "+---+---+---+------+-----+",
1771            "| a | b | c | year | day |",
1772            "+---+---+---+------+-----+",
1773            "| 0 | 2 | 4 | 2021 | 28  |",
1774            "| 1 | 3 | 5 | 2021 | 28  |",
1775            "| 3 | 4 | 6 | 2021 | 28  |",
1776            "+---+---+---+------+-----+",
1777        ];
1778        assert_batches_eq!(expected, &[projected_batch]);
1779
1780        // forgot to dictionary-wrap the scalar value
1781        let file_batch = build_table_i32(
1782            ("a", &vec![0, 1, 2]),
1783            ("b", &vec![-2, -1, 0]),
1784            ("c", &vec![10, 11, 12]),
1785        );
1786        let projected_batch = proj
1787            .project(
1788                // file_batch is ok here because we kept all the file cols in the projection
1789                file_batch,
1790                &[
1791                    ScalarValue::from("2021"),
1792                    ScalarValue::from("10"),
1793                    ScalarValue::from("26"),
1794                ],
1795            )
1796            .expect("Projection of partition columns into record batch failed");
1797        let expected = [
1798            "+---+----+----+------+-----+",
1799            "| a | b  | c  | year | day |",
1800            "+---+----+----+------+-----+",
1801            "| 0 | -2 | 10 | 2021 | 26  |",
1802            "| 1 | -1 | 11 | 2021 | 26  |",
1803            "| 2 | 0  | 12 | 2021 | 26  |",
1804            "+---+----+----+------+-----+",
1805        ];
1806        assert_batches_eq!(expected, &[projected_batch]);
1807    }
1808
1809    #[test]
1810    fn test_projected_file_schema_with_partition_col() {
1811        let schema = aggr_test_schema();
1812        let partition_cols = vec![
1813            (
1814                "part1".to_owned(),
1815                wrap_partition_type_in_dict(DataType::Utf8),
1816            ),
1817            (
1818                "part2".to_owned(),
1819                wrap_partition_type_in_dict(DataType::Utf8),
1820            ),
1821        ];
1822
1823        // Projected file schema for config with projection including partition column
1824        let projection = config_for_projection(
1825            schema.clone(),
1826            Some(vec![0, 3, 5, schema.fields().len()]),
1827            Statistics::new_unknown(&schema),
1828            to_partition_cols(partition_cols),
1829        )
1830        .projected_file_schema();
1831
1832        // Assert partition column filtered out in projected file schema
1833        let expected_columns = vec!["c1", "c4", "c6"];
1834        let actual_columns = projection
1835            .fields()
1836            .iter()
1837            .map(|f| f.name().clone())
1838            .collect::<Vec<_>>();
1839        assert_eq!(expected_columns, actual_columns);
1840    }
1841
1842    #[test]
1843    fn test_projected_file_schema_without_projection() {
1844        let schema = aggr_test_schema();
1845        let partition_cols = vec![
1846            (
1847                "part1".to_owned(),
1848                wrap_partition_type_in_dict(DataType::Utf8),
1849            ),
1850            (
1851                "part2".to_owned(),
1852                wrap_partition_type_in_dict(DataType::Utf8),
1853            ),
1854        ];
1855
1856        // Projected file schema for config without projection
1857        let projection = config_for_projection(
1858            schema.clone(),
1859            None,
1860            Statistics::new_unknown(&schema),
1861            to_partition_cols(partition_cols),
1862        )
1863        .projected_file_schema();
1864
1865        // Assert projected file schema is equal to file schema
1866        assert_eq!(projection.fields(), schema.fields());
1867    }
1868
1869    #[test]
1870    fn test_split_groups_by_statistics() -> Result<()> {
1871        use chrono::TimeZone;
1872        use datafusion_common::DFSchema;
1873        use datafusion_expr::execution_props::ExecutionProps;
1874        use object_store::{path::Path, ObjectMeta};
1875
1876        struct File {
1877            name: &'static str,
1878            date: &'static str,
1879            statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1880        }
1881        impl File {
1882            fn new(
1883                name: &'static str,
1884                date: &'static str,
1885                statistics: Vec<Option<(f64, f64)>>,
1886            ) -> Self {
1887                Self::new_nullable(
1888                    name,
1889                    date,
1890                    statistics
1891                        .into_iter()
1892                        .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1893                        .collect(),
1894                )
1895            }
1896
1897            fn new_nullable(
1898                name: &'static str,
1899                date: &'static str,
1900                statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1901            ) -> Self {
1902                Self {
1903                    name,
1904                    date,
1905                    statistics,
1906                }
1907            }
1908        }
1909
1910        struct TestCase {
1911            name: &'static str,
1912            file_schema: Schema,
1913            files: Vec<File>,
1914            sort: Vec<SortExpr>,
1915            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1916        }
1917
1918        use datafusion_expr::col;
1919        let cases = vec![
1920            TestCase {
1921                name: "test sort",
1922                file_schema: Schema::new(vec![Field::new(
1923                    "value".to_string(),
1924                    DataType::Float64,
1925                    false,
1926                )]),
1927                files: vec![
1928                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1929                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1930                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1931                ],
1932                sort: vec![col("value").sort(true, false)],
1933                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1934            },
1935            // same input but file '2' is in the middle
1936            // test that we still order correctly
1937            TestCase {
1938                name: "test sort with files ordered differently",
1939                file_schema: Schema::new(vec![Field::new(
1940                    "value".to_string(),
1941                    DataType::Float64,
1942                    false,
1943                )]),
1944                files: vec![
1945                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1946                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1947                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1948                ],
1949                sort: vec![col("value").sort(true, false)],
1950                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1951            },
1952            TestCase {
1953                name: "reverse sort",
1954                file_schema: Schema::new(vec![Field::new(
1955                    "value".to_string(),
1956                    DataType::Float64,
1957                    false,
1958                )]),
1959                files: vec![
1960                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1961                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1962                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1963                ],
1964                sort: vec![col("value").sort(false, true)],
1965                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1966            },
1967            TestCase {
1968                name: "nullable sort columns, nulls last",
1969                file_schema: Schema::new(vec![Field::new(
1970                    "value".to_string(),
1971                    DataType::Float64,
1972                    true,
1973                )]),
1974                files: vec![
1975                    File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
1976                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1977                    File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1978                ],
1979                sort: vec![col("value").sort(true, false)],
1980                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1981            },
1982            TestCase {
1983                name: "nullable sort columns, nulls first",
1984                file_schema: Schema::new(vec![Field::new(
1985                    "value".to_string(),
1986                    DataType::Float64,
1987                    true,
1988                )]),
1989                files: vec![
1990                    File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1991                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
1992                    File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1993                ],
1994                sort: vec![col("value").sort(true, true)],
1995                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
1996            },
1997            TestCase {
1998                name: "all three non-overlapping",
1999                file_schema: Schema::new(vec![Field::new(
2000                    "value".to_string(),
2001                    DataType::Float64,
2002                    false,
2003                )]),
2004                files: vec![
2005                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2006                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
2007                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
2008                ],
2009                sort: vec![col("value").sort(true, false)],
2010                expected_result: Ok(vec![vec!["0", "1", "2"]]),
2011            },
2012            TestCase {
2013                name: "all three overlapping",
2014                file_schema: Schema::new(vec![Field::new(
2015                    "value".to_string(),
2016                    DataType::Float64,
2017                    false,
2018                )]),
2019                files: vec![
2020                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2021                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2022                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
2023                ],
2024                sort: vec![col("value").sort(true, false)],
2025                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
2026            },
2027            TestCase {
2028                name: "empty input",
2029                file_schema: Schema::new(vec![Field::new(
2030                    "value".to_string(),
2031                    DataType::Float64,
2032                    false,
2033                )]),
2034                files: vec![],
2035                sort: vec![col("value").sort(true, false)],
2036                expected_result: Ok(vec![]),
2037            },
2038            TestCase {
2039                name: "one file missing statistics",
2040                file_schema: Schema::new(vec![Field::new(
2041                    "value".to_string(),
2042                    DataType::Float64,
2043                    false,
2044                )]),
2045                files: vec![
2046                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2047                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2048                    File::new("2", "2023-01-02", vec![None]),
2049                ],
2050                sort: vec![col("value").sort(true, false)],
2051                expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
2052            },
2053        ];
2054
2055        for case in cases {
2056            let table_schema = Arc::new(Schema::new(
2057                case.file_schema
2058                    .fields()
2059                    .clone()
2060                    .into_iter()
2061                    .cloned()
2062                    .chain(Some(Arc::new(Field::new(
2063                        "date".to_string(),
2064                        DataType::Utf8,
2065                        false,
2066                    ))))
2067                    .collect::<Vec<_>>(),
2068            ));
2069            let Some(sort_order) = LexOrdering::new(
2070                case.sort
2071                    .into_iter()
2072                    .map(|expr| {
2073                        create_physical_sort_expr(
2074                            &expr,
2075                            &DFSchema::try_from(Arc::clone(&table_schema))?,
2076                            &ExecutionProps::default(),
2077                        )
2078                    })
2079                    .collect::<Result<Vec<_>>>()?,
2080            ) else {
2081                return internal_err!("This test should always use an ordering");
2082            };
2083
2084            let partitioned_files = FileGroup::new(
2085                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2086            );
2087            let result = FileScanConfig::split_groups_by_statistics(
2088                &table_schema,
2089                std::slice::from_ref(&partitioned_files),
2090                &sort_order,
2091            );
2092            let results_by_name = result
2093                .as_ref()
2094                .map(|file_groups| {
2095                    file_groups
2096                        .iter()
2097                        .map(|file_group| {
2098                            file_group
2099                                .iter()
2100                                .map(|file| {
2101                                    partitioned_files
2102                                        .iter()
2103                                        .find_map(|f| {
2104                                            if f.object_meta == file.object_meta {
2105                                                Some(
2106                                                    f.object_meta
2107                                                        .location
2108                                                        .as_ref()
2109                                                        .rsplit('/')
2110                                                        .next()
2111                                                        .unwrap()
2112                                                        .trim_end_matches(".parquet"),
2113                                                )
2114                                            } else {
2115                                                None
2116                                            }
2117                                        })
2118                                        .unwrap()
2119                                })
2120                                .collect::<Vec<_>>()
2121                        })
2122                        .collect::<Vec<_>>()
2123                })
2124                .map_err(|e| e.strip_backtrace().leak() as &'static str);
2125
2126            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2127        }
2128
2129        return Ok(());
2130
2131        impl From<File> for PartitionedFile {
2132            fn from(file: File) -> Self {
2133                PartitionedFile {
2134                    object_meta: ObjectMeta {
2135                        location: Path::from(format!(
2136                            "data/date={}/{}.parquet",
2137                            file.date, file.name
2138                        )),
2139                        last_modified: chrono::Utc.timestamp_nanos(0),
2140                        size: 0,
2141                        e_tag: None,
2142                        version: None,
2143                    },
2144                    partition_values: vec![ScalarValue::from(file.date)],
2145                    range: None,
2146                    statistics: Some(Arc::new(Statistics {
2147                        num_rows: Precision::Absent,
2148                        total_byte_size: Precision::Absent,
2149                        column_statistics: file
2150                            .statistics
2151                            .into_iter()
2152                            .map(|stats| {
2153                                stats
2154                                    .map(|(min, max)| ColumnStatistics {
2155                                        min_value: Precision::Exact(
2156                                            ScalarValue::Float64(min),
2157                                        ),
2158                                        max_value: Precision::Exact(
2159                                            ScalarValue::Float64(max),
2160                                        ),
2161                                        ..Default::default()
2162                                    })
2163                                    .unwrap_or_default()
2164                            })
2165                            .collect::<Vec<_>>(),
2166                    })),
2167                    extensions: None,
2168                    metadata_size_hint: None,
2169                }
2170            }
2171        }
2172    }
2173
2174    // sets default for configs that play no role in projections
2175    fn config_for_projection(
2176        file_schema: SchemaRef,
2177        projection: Option<Vec<usize>>,
2178        statistics: Statistics,
2179        table_partition_cols: Vec<Field>,
2180    ) -> FileScanConfig {
2181        FileScanConfigBuilder::new(
2182            ObjectStoreUrl::parse("test:///").unwrap(),
2183            file_schema,
2184            Arc::new(MockSource::default()),
2185        )
2186        .with_projection_indices(projection)
2187        .with_statistics(statistics)
2188        .with_table_partition_cols(table_partition_cols)
2189        .build()
2190    }
2191
2192    /// Convert partition columns from Vec<String DataType> to Vec<Field>
2193    fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2194        table_partition_cols
2195            .iter()
2196            .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2197            .collect::<Vec<_>>()
2198    }
2199
2200    /// returns record batch with 3 columns of i32 in memory
2201    pub fn build_table_i32(
2202        a: (&str, &Vec<i32>),
2203        b: (&str, &Vec<i32>),
2204        c: (&str, &Vec<i32>),
2205    ) -> RecordBatch {
2206        let schema = Schema::new(vec![
2207            Field::new(a.0, DataType::Int32, false),
2208            Field::new(b.0, DataType::Int32, false),
2209            Field::new(c.0, DataType::Int32, false),
2210        ]);
2211
2212        RecordBatch::try_new(
2213            Arc::new(schema),
2214            vec![
2215                Arc::new(Int32Array::from(a.1.clone())),
2216                Arc::new(Int32Array::from(b.1.clone())),
2217                Arc::new(Int32Array::from(c.1.clone())),
2218            ],
2219        )
2220        .unwrap()
2221    }
2222
2223    #[test]
2224    fn test_file_scan_config_builder() {
2225        let file_schema = aggr_test_schema();
2226        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2227        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2228
2229        // Create a builder with required parameters
2230        let builder = FileScanConfigBuilder::new(
2231            object_store_url.clone(),
2232            Arc::clone(&file_schema),
2233            Arc::clone(&file_source),
2234        );
2235
2236        // Build with various configurations
2237        let config = builder
2238            .with_limit(Some(1000))
2239            .with_projection_indices(Some(vec![0, 1]))
2240            .with_table_partition_cols(vec![Field::new(
2241                "date",
2242                wrap_partition_type_in_dict(DataType::Utf8),
2243                false,
2244            )])
2245            .with_statistics(Statistics::new_unknown(&file_schema))
2246            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2247                "test.parquet".to_string(),
2248                1024,
2249            )])])
2250            .with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
2251                Column::new("date", 0),
2252            ))]
2253            .into()])
2254            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2255            .with_newlines_in_values(true)
2256            .build();
2257
2258        // Verify the built config has all the expected values
2259        assert_eq!(config.object_store_url, object_store_url);
2260        assert_eq!(*config.file_schema(), file_schema);
2261        assert_eq!(config.limit, Some(1000));
2262        assert_eq!(
2263            config.projection_exprs.as_ref().map(|p| p.column_indices()),
2264            Some(vec![0, 1])
2265        );
2266        assert_eq!(config.table_partition_cols().len(), 1);
2267        assert_eq!(config.table_partition_cols()[0].name(), "date");
2268        assert_eq!(config.file_groups.len(), 1);
2269        assert_eq!(config.file_groups[0].len(), 1);
2270        assert_eq!(
2271            config.file_groups[0][0].object_meta.location.as_ref(),
2272            "test.parquet"
2273        );
2274        assert_eq!(
2275            config.file_compression_type,
2276            FileCompressionType::UNCOMPRESSED
2277        );
2278        assert!(config.new_lines_in_values);
2279        assert_eq!(config.output_ordering.len(), 1);
2280    }
2281
2282    #[test]
2283    fn equivalence_properties_after_schema_change() {
2284        let file_schema = aggr_test_schema();
2285        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2286        // Create a file source with a filter
2287        let file_source: Arc<dyn FileSource> =
2288            Arc::new(MockSource::default().with_filter(Arc::new(BinaryExpr::new(
2289                col("c2", &file_schema).unwrap(),
2290                Operator::Eq,
2291                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2292            ))));
2293
2294        let config = FileScanConfigBuilder::new(
2295            object_store_url.clone(),
2296            Arc::clone(&file_schema),
2297            Arc::clone(&file_source),
2298        )
2299        .with_projection_indices(Some(vec![0, 1, 2]))
2300        .build();
2301
2302        // Simulate projection being updated. Since the filter has already been pushed down,
2303        // the new projection won't include the filtered column.
2304        let data_source = config
2305            .try_swapping_with_projection(&[ProjectionExpr::new(
2306                col("c3", &file_schema).unwrap(),
2307                "c3".to_string(),
2308            )])
2309            .unwrap()
2310            .unwrap();
2311
2312        // Gather the equivalence properties from the new data source. There should
2313        // be no equivalence class for column c2 since it was removed by the projection.
2314        let eq_properties = data_source.eq_properties();
2315        let eq_group = eq_properties.eq_group();
2316
2317        for class in eq_group.iter() {
2318            for expr in class.iter() {
2319                if let Some(col) = expr.as_any().downcast_ref::<Column>() {
2320                    assert_ne!(
2321                        col.name(),
2322                        "c2",
2323                        "c2 should not be present in any equivalence class"
2324                    );
2325                }
2326            }
2327        }
2328    }
2329
2330    #[test]
2331    fn test_file_scan_config_builder_defaults() {
2332        let file_schema = aggr_test_schema();
2333        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2334        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2335
2336        // Create a builder with only required parameters and build without any additional configurations
2337        let config = FileScanConfigBuilder::new(
2338            object_store_url.clone(),
2339            Arc::clone(&file_schema),
2340            Arc::clone(&file_source),
2341        )
2342        .build();
2343
2344        // Verify default values
2345        assert_eq!(config.object_store_url, object_store_url);
2346        assert_eq!(*config.file_schema(), file_schema);
2347        assert_eq!(config.limit, None);
2348        assert_eq!(
2349            config.projection_exprs.as_ref().map(|p| p.column_indices()),
2350            None
2351        );
2352        assert!(config.table_partition_cols().is_empty());
2353        assert!(config.file_groups.is_empty());
2354        assert_eq!(
2355            config.file_compression_type,
2356            FileCompressionType::UNCOMPRESSED
2357        );
2358        assert!(!config.new_lines_in_values);
2359        assert!(config.output_ordering.is_empty());
2360        assert!(config.constraints.is_empty());
2361
2362        // Verify statistics are set to unknown
2363        assert_eq!(
2364            config.file_source.statistics().unwrap().num_rows,
2365            Precision::Absent
2366        );
2367        assert_eq!(
2368            config.file_source.statistics().unwrap().total_byte_size,
2369            Precision::Absent
2370        );
2371        assert_eq!(
2372            config
2373                .file_source
2374                .statistics()
2375                .unwrap()
2376                .column_statistics
2377                .len(),
2378            file_schema.fields().len()
2379        );
2380        for stat in config.file_source.statistics().unwrap().column_statistics {
2381            assert_eq!(stat.distinct_count, Precision::Absent);
2382            assert_eq!(stat.min_value, Precision::Absent);
2383            assert_eq!(stat.max_value, Precision::Absent);
2384            assert_eq!(stat.null_count, Precision::Absent);
2385        }
2386    }
2387
2388    #[test]
2389    fn test_file_scan_config_builder_new_from() {
2390        let schema = aggr_test_schema();
2391        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2392        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2393        let partition_cols = vec![Field::new(
2394            "date",
2395            wrap_partition_type_in_dict(DataType::Utf8),
2396            false,
2397        )];
2398        let file = PartitionedFile::new("test_file.parquet", 100);
2399
2400        // Create a config with non-default values
2401        let original_config = FileScanConfigBuilder::new(
2402            object_store_url.clone(),
2403            Arc::clone(&schema),
2404            Arc::clone(&file_source),
2405        )
2406        .with_projection_indices(Some(vec![0, 2]))
2407        .with_limit(Some(10))
2408        .with_table_partition_cols(partition_cols.clone())
2409        .with_file(file.clone())
2410        .with_constraints(Constraints::default())
2411        .with_newlines_in_values(true)
2412        .build();
2413
2414        // Create a new builder from the config
2415        let new_builder = FileScanConfigBuilder::from(original_config);
2416
2417        // Build a new config from this builder
2418        let new_config = new_builder.build();
2419
2420        // Verify properties match
2421        let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2422        assert_eq!(new_config.object_store_url, object_store_url);
2423        assert_eq!(*new_config.file_schema(), schema);
2424        assert_eq!(
2425            new_config
2426                .projection_exprs
2427                .as_ref()
2428                .map(|p| p.column_indices()),
2429            Some(vec![0, 2])
2430        );
2431        assert_eq!(new_config.limit, Some(10));
2432        assert_eq!(*new_config.table_partition_cols(), partition_cols);
2433        assert_eq!(new_config.file_groups.len(), 1);
2434        assert_eq!(new_config.file_groups[0].len(), 1);
2435        assert_eq!(
2436            new_config.file_groups[0][0].object_meta.location.as_ref(),
2437            "test_file.parquet"
2438        );
2439        assert_eq!(new_config.constraints, Constraints::default());
2440        assert!(new_config.new_lines_in_values);
2441    }
2442
2443    #[test]
2444    fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2445        use datafusion_common::DFSchema;
2446        use datafusion_expr::{col, execution_props::ExecutionProps};
2447
2448        let schema = Arc::new(Schema::new(vec![Field::new(
2449            "value",
2450            DataType::Float64,
2451            false,
2452        )]));
2453
2454        // Setup sort expression
2455        let exec_props = ExecutionProps::new();
2456        let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2457        let sort_expr = [col("value").sort(true, false)];
2458        let sort_ordering = sort_expr
2459            .map(|expr| {
2460                create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2461            })
2462            .into();
2463
2464        // Test case parameters
2465        struct TestCase {
2466            name: String,
2467            file_count: usize,
2468            overlap_factor: f64,
2469            target_partitions: usize,
2470            expected_partition_count: usize,
2471        }
2472
2473        let test_cases = vec![
2474            // Basic cases
2475            TestCase {
2476                name: "no_overlap_10_files_4_partitions".to_string(),
2477                file_count: 10,
2478                overlap_factor: 0.0,
2479                target_partitions: 4,
2480                expected_partition_count: 4,
2481            },
2482            TestCase {
2483                name: "medium_overlap_20_files_5_partitions".to_string(),
2484                file_count: 20,
2485                overlap_factor: 0.5,
2486                target_partitions: 5,
2487                expected_partition_count: 5,
2488            },
2489            TestCase {
2490                name: "high_overlap_30_files_3_partitions".to_string(),
2491                file_count: 30,
2492                overlap_factor: 0.8,
2493                target_partitions: 3,
2494                expected_partition_count: 7,
2495            },
2496            // Edge cases
2497            TestCase {
2498                name: "fewer_files_than_partitions".to_string(),
2499                file_count: 3,
2500                overlap_factor: 0.0,
2501                target_partitions: 10,
2502                expected_partition_count: 3, // Should only create as many partitions as files
2503            },
2504            TestCase {
2505                name: "single_file".to_string(),
2506                file_count: 1,
2507                overlap_factor: 0.0,
2508                target_partitions: 5,
2509                expected_partition_count: 1, // Should create only one partition
2510            },
2511            TestCase {
2512                name: "empty_files".to_string(),
2513                file_count: 0,
2514                overlap_factor: 0.0,
2515                target_partitions: 3,
2516                expected_partition_count: 0, // Empty result for empty input
2517            },
2518        ];
2519
2520        for case in test_cases {
2521            println!("Running test case: {}", case.name);
2522
2523            // Generate files using bench utility function
2524            let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2525
2526            // Call the function under test
2527            let result =
2528                FileScanConfig::split_groups_by_statistics_with_target_partitions(
2529                    &schema,
2530                    &file_groups,
2531                    &sort_ordering,
2532                    case.target_partitions,
2533                )?;
2534
2535            // Verify results
2536            println!(
2537                "Created {} partitions (target was {})",
2538                result.len(),
2539                case.target_partitions
2540            );
2541
2542            // Check partition count
2543            assert_eq!(
2544                result.len(),
2545                case.expected_partition_count,
2546                "Case '{}': Unexpected partition count",
2547                case.name
2548            );
2549
2550            // Verify sort integrity
2551            assert!(
2552                verify_sort_integrity(&result),
2553                "Case '{}': Files within partitions are not properly ordered",
2554                case.name
2555            );
2556
2557            // Distribution check for partitions
2558            if case.file_count > 1 && case.expected_partition_count > 1 {
2559                let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2560                let max_size = *group_sizes.iter().max().unwrap();
2561                let min_size = *group_sizes.iter().min().unwrap();
2562
2563                // Check partition balancing - difference shouldn't be extreme
2564                let avg_files_per_partition =
2565                    case.file_count as f64 / case.expected_partition_count as f64;
2566                assert!(
2567                    (max_size as f64) < 2.0 * avg_files_per_partition,
2568                    "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2569                    case.name,
2570                    max_size,
2571                    avg_files_per_partition
2572                );
2573
2574                println!("Distribution - min files: {min_size}, max files: {max_size}");
2575            }
2576        }
2577
2578        // Test error case: zero target partitions
2579        let empty_groups: Vec<FileGroup> = vec![];
2580        let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2581            &schema,
2582            &empty_groups,
2583            &sort_ordering,
2584            0,
2585        )
2586        .unwrap_err();
2587
2588        assert!(
2589            err.to_string()
2590                .contains("target_partitions must be greater than 0"),
2591            "Expected error for zero target partitions"
2592        );
2593
2594        Ok(())
2595    }
2596
2597    #[test]
2598    fn test_partition_statistics_projection() {
2599        // This test verifies that partition_statistics applies projection correctly.
2600        // The old implementation had a bug where it returned file group statistics
2601        // without applying the projection, returning all column statistics instead
2602        // of just the projected ones.
2603
2604        use crate::source::DataSourceExec;
2605        use datafusion_physical_plan::ExecutionPlan;
2606
2607        // Create a schema with 4 columns
2608        let schema = Arc::new(Schema::new(vec![
2609            Field::new("col0", DataType::Int32, false),
2610            Field::new("col1", DataType::Int32, false),
2611            Field::new("col2", DataType::Int32, false),
2612            Field::new("col3", DataType::Int32, false),
2613        ]));
2614
2615        // Create statistics for all 4 columns
2616        let file_group_stats = Statistics {
2617            num_rows: Precision::Exact(100),
2618            total_byte_size: Precision::Exact(1024),
2619            column_statistics: vec![
2620                ColumnStatistics {
2621                    null_count: Precision::Exact(0),
2622                    ..ColumnStatistics::new_unknown()
2623                },
2624                ColumnStatistics {
2625                    null_count: Precision::Exact(5),
2626                    ..ColumnStatistics::new_unknown()
2627                },
2628                ColumnStatistics {
2629                    null_count: Precision::Exact(10),
2630                    ..ColumnStatistics::new_unknown()
2631                },
2632                ColumnStatistics {
2633                    null_count: Precision::Exact(15),
2634                    ..ColumnStatistics::new_unknown()
2635                },
2636            ],
2637        };
2638
2639        // Create a file group with statistics
2640        let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2641            .with_statistics(Arc::new(file_group_stats));
2642
2643        // Create a FileScanConfig with projection: only keep columns 0 and 2
2644        let config = FileScanConfigBuilder::new(
2645            ObjectStoreUrl::parse("test:///").unwrap(),
2646            Arc::clone(&schema),
2647            Arc::new(MockSource::default()),
2648        )
2649        .with_projection_indices(Some(vec![0, 2])) // Only project columns 0 and 2
2650        .with_file_groups(vec![file_group])
2651        .build();
2652
2653        // Create a DataSourceExec from the config
2654        let exec = DataSourceExec::from_data_source(config);
2655
2656        // Get statistics for partition 0
2657        let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2658
2659        // Verify that only 2 columns are in the statistics (the projected ones)
2660        assert_eq!(
2661            partition_stats.column_statistics.len(),
2662            2,
2663            "Expected 2 column statistics (projected), but got {}",
2664            partition_stats.column_statistics.len()
2665        );
2666
2667        // Verify the column statistics are for columns 0 and 2
2668        assert_eq!(
2669            partition_stats.column_statistics[0].null_count,
2670            Precision::Exact(0),
2671            "First projected column should be col0 with 0 nulls"
2672        );
2673        assert_eq!(
2674            partition_stats.column_statistics[1].null_count,
2675            Precision::Exact(10),
2676            "Second projected column should be col2 with 10 nulls"
2677        );
2678
2679        // Verify row count and byte size are preserved
2680        assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2681        assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024));
2682    }
2683}