datafusion_datasource/
file_scan_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21use std::{
22    any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23    fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use crate::file_groups::FileGroup;
27#[allow(unused_imports)]
28use crate::schema_adapter::SchemaAdapterFactory;
29use crate::{
30    display::FileGroupsDisplay,
31    file::FileSource,
32    file_compression_type::FileCompressionType,
33    file_stream::FileStream,
34    source::{DataSource, DataSourceExec},
35    statistics::MinMaxStatistics,
36    PartitionedFile,
37};
38use arrow::datatypes::FieldRef;
39use arrow::{
40    array::{
41        ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
42        RecordBatchOptions,
43    },
44    buffer::Buffer,
45    datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
46};
47use datafusion_common::config::ConfigOptions;
48use datafusion_common::{
49    exec_err, ColumnStatistics, Constraints, DataFusionError, Result, ScalarValue,
50    Statistics,
51};
52use datafusion_execution::{
53    object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
54};
55use datafusion_physical_expr::expressions::Column;
56use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory;
57use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
58use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
59use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
60use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
61use datafusion_physical_plan::{
62    display::{display_orderings, ProjectSchemaDisplay},
63    metrics::ExecutionPlanMetricsSet,
64    projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
65    DisplayAs, DisplayFormatType, ExecutionPlan,
66};
67
68use datafusion_physical_plan::coop::cooperative;
69use datafusion_physical_plan::execution_plan::SchedulingType;
70use log::{debug, warn};
71
72/// The base configurations for a [`DataSourceExec`], the a physical plan for
73/// any given file format.
74///
75/// Use [`Self::build`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
76///
77/// # Example
78/// ```
79/// # use std::any::Any;
80/// # use std::sync::Arc;
81/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
82/// # use object_store::ObjectStore;
83/// # use datafusion_common::Statistics;
84/// # use datafusion_common::Result;
85/// # use datafusion_datasource::file::FileSource;
86/// # use datafusion_datasource::file_groups::FileGroup;
87/// # use datafusion_datasource::PartitionedFile;
88/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
89/// # use datafusion_datasource::file_stream::FileOpener;
90/// # use datafusion_datasource::source::DataSourceExec;
91/// # use datafusion_execution::object_store::ObjectStoreUrl;
92/// # use datafusion_physical_plan::ExecutionPlan;
93/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
94/// # use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
95/// # let file_schema = Arc::new(Schema::new(vec![
96/// #  Field::new("c1", DataType::Int32, false),
97/// #  Field::new("c2", DataType::Int32, false),
98/// #  Field::new("c3", DataType::Int32, false),
99/// #  Field::new("c4", DataType::Int32, false),
100/// # ]));
101/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
102/// #[derive(Clone)]
103/// # struct ParquetSource {
104/// #    projected_statistics: Option<Statistics>,
105/// #    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>
106/// # };
107/// # impl FileSource for ParquetSource {
108/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
109/// #  fn as_any(&self) -> &dyn Any { self  }
110/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
111/// #  fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { Arc::new(self.clone()) as Arc<dyn FileSource> }
112/// #  fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
113/// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) }
114/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
115/// #  fn statistics(&self) -> Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) }
116/// #  fn file_type(&self) -> &str { "parquet" }
117/// #  fn with_schema_adapter_factory(&self, factory: Arc<dyn SchemaAdapterFactory>) -> Result<Arc<dyn FileSource>> { Ok(Arc::new(Self {projected_statistics: self.projected_statistics.clone(), schema_adapter_factory: Some(factory)} )) }
118/// #  fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { self.schema_adapter_factory.clone() }
119/// #  }
120/// # impl ParquetSource {
121/// #  fn new() -> Self { Self {projected_statistics: None, schema_adapter_factory: None} }
122/// # }
123/// // create FileScan config for reading parquet files from file://
124/// let object_store_url = ObjectStoreUrl::local_filesystem();
125/// let file_source = Arc::new(ParquetSource::new());
126/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
127///   .with_limit(Some(1000))            // read only the first 1000 records
128///   .with_projection(Some(vec![2, 3])) // project columns 2 and 3
129///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
130///   .with_file(PartitionedFile::new("file1.parquet", 1234))
131///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
132///   // in a  single row group
133///   .with_file_group(FileGroup::new(vec![
134///    PartitionedFile::new("file2.parquet", 56),
135///    PartitionedFile::new("file3.parquet", 78),
136///   ])).build();
137/// // create an execution plan from the config
138/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
139/// ```
140#[derive(Clone)]
141pub struct FileScanConfig {
142    /// Object store URL, used to get an [`ObjectStore`] instance from
143    /// [`RuntimeEnv::object_store`]
144    ///
145    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
146    /// as `file://` or `s3://my_bucket`. It should not include the path to the
147    /// file itself. The relevant URL prefix must be registered via
148    /// [`RuntimeEnv::register_object_store`]
149    ///
150    /// [`ObjectStore`]: object_store::ObjectStore
151    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
152    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
153    pub object_store_url: ObjectStoreUrl,
154    /// Schema before `projection` is applied. It contains the all columns that may
155    /// appear in the files. It does not include table partition columns
156    /// that may be added.
157    /// Note that this is **not** the schema of the physical files.
158    /// This is the schema that the physical file schema will be
159    /// mapped onto, and the schema that the [`DataSourceExec`] will return.
160    pub file_schema: SchemaRef,
161    /// List of files to be processed, grouped into partitions
162    ///
163    /// Each file must have a schema of `file_schema` or a subset. If
164    /// a particular file has a subset, the missing columns are
165    /// padded with NULLs.
166    ///
167    /// DataFusion may attempt to read each partition of files
168    /// concurrently, however files *within* a partition will be read
169    /// sequentially, one after the next.
170    pub file_groups: Vec<FileGroup>,
171    /// Table constraints
172    pub constraints: Constraints,
173    /// Columns on which to project the data. Indexes that are higher than the
174    /// number of columns of `file_schema` refer to `table_partition_cols`.
175    pub projection: Option<Vec<usize>>,
176    /// The maximum number of records to read from this plan. If `None`,
177    /// all records after filtering are returned.
178    pub limit: Option<usize>,
179    /// The partitioning columns
180    pub table_partition_cols: Vec<FieldRef>,
181    /// All equivalent lexicographical orderings that describe the schema.
182    pub output_ordering: Vec<LexOrdering>,
183    /// File compression type
184    pub file_compression_type: FileCompressionType,
185    /// Are new lines in values supported for CSVOptions
186    pub new_lines_in_values: bool,
187    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
188    pub file_source: Arc<dyn FileSource>,
189    /// Batch size while creating new batches
190    /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
191    pub batch_size: Option<usize>,
192    /// Expression adapter used to adapt filters and projections that are pushed down into the scan
193    /// from the logical schema to the physical schema of the file.
194    pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
195}
196
197/// A builder for [`FileScanConfig`]'s.
198///
199/// Example:
200///
201/// ```rust
202/// # use std::sync::Arc;
203/// # use arrow::datatypes::{DataType, Field, Schema};
204/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig};
205/// # use datafusion_datasource::file_compression_type::FileCompressionType;
206/// # use datafusion_datasource::file_groups::FileGroup;
207/// # use datafusion_datasource::PartitionedFile;
208/// # use datafusion_execution::object_store::ObjectStoreUrl;
209/// # use datafusion_common::Statistics;
210/// # use datafusion_datasource::file::FileSource;
211///
212/// # fn main() {
213/// # fn with_source(file_source: Arc<dyn FileSource>) {
214///     // Create a schema for our Parquet files
215///     let schema = Arc::new(Schema::new(vec![
216///         Field::new("id", DataType::Int32, false),
217///         Field::new("value", DataType::Utf8, false),
218///     ]));
219///
220///     // Create a builder for scanning Parquet files from a local filesystem
221///     let config = FileScanConfigBuilder::new(
222///         ObjectStoreUrl::local_filesystem(),
223///         schema,
224///         file_source,
225///     )
226///     // Set a limit of 1000 rows
227///     .with_limit(Some(1000))
228///     // Project only the first column
229///     .with_projection(Some(vec![0]))
230///     // Add partition columns
231///     .with_table_partition_cols(vec![
232///         Field::new("date", DataType::Utf8, false),
233///     ])
234///     // Add a file group with two files
235///     .with_file_group(FileGroup::new(vec![
236///         PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024),
237///         PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048),
238///     ]))
239///     // Set compression type
240///     .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
241///     // Build the final config
242///     .build();
243/// # }
244/// # }
245/// ```
246#[derive(Clone)]
247pub struct FileScanConfigBuilder {
248    object_store_url: ObjectStoreUrl,
249    /// Table schema before any projections or partition columns are applied.
250    ///
251    /// This schema is used to read the files, but is **not** necessarily the
252    /// schema of the physical files. Rather this is the schema that the
253    /// physical file schema will be mapped onto, and the schema that the
254    /// [`DataSourceExec`] will return.
255    ///
256    /// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns.
257    ///
258    /// This probably would be better named `table_schema`
259    file_schema: SchemaRef,
260    file_source: Arc<dyn FileSource>,
261
262    limit: Option<usize>,
263    projection: Option<Vec<usize>>,
264    table_partition_cols: Vec<FieldRef>,
265    constraints: Option<Constraints>,
266    file_groups: Vec<FileGroup>,
267    statistics: Option<Statistics>,
268    output_ordering: Vec<LexOrdering>,
269    file_compression_type: Option<FileCompressionType>,
270    new_lines_in_values: Option<bool>,
271    batch_size: Option<usize>,
272    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
273}
274
275impl FileScanConfigBuilder {
276    /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files.
277    ///
278    /// # Parameters:
279    /// * `object_store_url`: See [`FileScanConfig::object_store_url`]
280    /// * `file_schema`: See [`FileScanConfig::file_schema`]
281    /// * `file_source`: See [`FileScanConfig::file_source`]
282    pub fn new(
283        object_store_url: ObjectStoreUrl,
284        file_schema: SchemaRef,
285        file_source: Arc<dyn FileSource>,
286    ) -> Self {
287        Self {
288            object_store_url,
289            file_schema,
290            file_source,
291            file_groups: vec![],
292            statistics: None,
293            output_ordering: vec![],
294            file_compression_type: None,
295            new_lines_in_values: None,
296            limit: None,
297            projection: None,
298            table_partition_cols: vec![],
299            constraints: None,
300            batch_size: None,
301            expr_adapter_factory: None,
302        }
303    }
304
305    /// Set the maximum number of records to read from this plan. If `None`,
306    /// all records after filtering are returned.
307    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
308        self.limit = limit;
309        self
310    }
311
312    /// Set the file source for scanning files.
313    ///
314    /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
315    /// after the builder has been created.
316    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
317        self.file_source = file_source;
318        self
319    }
320
321    /// Set the columns on which to project the data. Indexes that are higher than the
322    /// number of columns of `file_schema` refer to `table_partition_cols`.
323    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
324        self.projection = projection;
325        self
326    }
327
328    /// Set the partitioning columns
329    pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
330        self.table_partition_cols = table_partition_cols
331            .into_iter()
332            .map(|f| Arc::new(f) as FieldRef)
333            .collect();
334        self
335    }
336
337    /// Set the table constraints
338    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
339        self.constraints = Some(constraints);
340        self
341    }
342
343    /// Set the estimated overall statistics of the files, taking `filters` into account.
344    /// Defaults to [`Statistics::new_unknown`].
345    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
346        self.statistics = Some(statistics);
347        self
348    }
349
350    /// Set the list of files to be processed, grouped into partitions.
351    ///
352    /// Each file must have a schema of `file_schema` or a subset. If
353    /// a particular file has a subset, the missing columns are
354    /// padded with NULLs.
355    ///
356    /// DataFusion may attempt to read each partition of files
357    /// concurrently, however files *within* a partition will be read
358    /// sequentially, one after the next.
359    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
360        self.file_groups = file_groups;
361        self
362    }
363
364    /// Add a new file group
365    ///
366    /// See [`Self::with_file_groups`] for more information
367    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
368        self.file_groups.push(file_group);
369        self
370    }
371
372    /// Add a file as a single group
373    ///
374    /// See [`Self::with_file_groups`] for more information.
375    pub fn with_file(self, file: PartitionedFile) -> Self {
376        self.with_file_group(FileGroup::new(vec![file]))
377    }
378
379    /// Set the output ordering of the files
380    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
381        self.output_ordering = output_ordering;
382        self
383    }
384
385    /// Set the file compression type
386    pub fn with_file_compression_type(
387        mut self,
388        file_compression_type: FileCompressionType,
389    ) -> Self {
390        self.file_compression_type = Some(file_compression_type);
391        self
392    }
393
394    /// Set whether new lines in values are supported for CSVOptions
395    ///
396    /// Parsing newlines in quoted values may be affected by execution behaviour such as
397    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
398    /// parsed successfully, which may reduce performance.
399    pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
400        self.new_lines_in_values = Some(new_lines_in_values);
401        self
402    }
403
404    /// Set the batch_size property
405    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
406        self.batch_size = batch_size;
407        self
408    }
409
410    /// Register an expression adapter used to adapt filters and projections that are pushed down into the scan
411    /// from the logical schema to the physical schema of the file.
412    /// This can include things like:
413    /// - Column ordering changes
414    /// - Handling of missing columns
415    /// - Rewriting expression to use pre-computed values or file format specific optimizations
416    pub fn with_expr_adapter(
417        mut self,
418        expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
419    ) -> Self {
420        self.expr_adapter_factory = expr_adapter;
421        self
422    }
423
424    /// Build the final [`FileScanConfig`] with all the configured settings.
425    ///
426    /// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
427    /// Any unset optional fields will use their default values.
428    pub fn build(self) -> FileScanConfig {
429        let Self {
430            object_store_url,
431            file_schema,
432            file_source,
433            limit,
434            projection,
435            table_partition_cols,
436            constraints,
437            file_groups,
438            statistics,
439            output_ordering,
440            file_compression_type,
441            new_lines_in_values,
442            batch_size,
443            expr_adapter_factory: expr_adapter,
444        } = self;
445
446        let constraints = constraints.unwrap_or_default();
447        let statistics =
448            statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
449
450        let file_source = file_source
451            .with_statistics(statistics.clone())
452            .with_schema(Arc::clone(&file_schema));
453        let file_compression_type =
454            file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
455        let new_lines_in_values = new_lines_in_values.unwrap_or(false);
456
457        FileScanConfig {
458            object_store_url,
459            file_schema,
460            file_source,
461            limit,
462            projection,
463            table_partition_cols,
464            constraints,
465            file_groups,
466            output_ordering,
467            file_compression_type,
468            new_lines_in_values,
469            batch_size,
470            expr_adapter_factory: expr_adapter,
471        }
472    }
473}
474
475impl From<FileScanConfig> for FileScanConfigBuilder {
476    fn from(config: FileScanConfig) -> Self {
477        Self {
478            object_store_url: config.object_store_url,
479            file_schema: config.file_schema,
480            file_source: Arc::<dyn FileSource>::clone(&config.file_source),
481            file_groups: config.file_groups,
482            statistics: config.file_source.statistics().ok(),
483            output_ordering: config.output_ordering,
484            file_compression_type: Some(config.file_compression_type),
485            new_lines_in_values: Some(config.new_lines_in_values),
486            limit: config.limit,
487            projection: config.projection,
488            table_partition_cols: config.table_partition_cols,
489            constraints: Some(config.constraints),
490            batch_size: config.batch_size,
491            expr_adapter_factory: config.expr_adapter_factory,
492        }
493    }
494}
495
496impl DataSource for FileScanConfig {
497    fn open(
498        &self,
499        partition: usize,
500        context: Arc<TaskContext>,
501    ) -> Result<SendableRecordBatchStream> {
502        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
503        let batch_size = self
504            .batch_size
505            .unwrap_or_else(|| context.session_config().batch_size());
506
507        let source = self
508            .file_source
509            .with_batch_size(batch_size)
510            .with_projection(self);
511
512        let opener = source.create_file_opener(object_store, self, partition);
513
514        let stream = FileStream::new(self, partition, opener, source.metrics())?;
515        Ok(Box::pin(cooperative(stream)))
516    }
517
518    fn as_any(&self) -> &dyn Any {
519        self
520    }
521
522    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
523        match t {
524            DisplayFormatType::Default | DisplayFormatType::Verbose => {
525                let schema = self.projected_schema();
526                let orderings = get_projected_output_ordering(self, &schema);
527
528                write!(f, "file_groups=")?;
529                FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
530
531                if !schema.fields().is_empty() {
532                    write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
533                }
534
535                if let Some(limit) = self.limit {
536                    write!(f, ", limit={limit}")?;
537                }
538
539                display_orderings(f, &orderings)?;
540
541                if !self.constraints.is_empty() {
542                    write!(f, ", {}", self.constraints)?;
543                }
544
545                self.fmt_file_source(t, f)
546            }
547            DisplayFormatType::TreeRender => {
548                writeln!(f, "format={}", self.file_source.file_type())?;
549                self.file_source.fmt_extra(t, f)?;
550                let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
551                writeln!(f, "files={num_files}")?;
552                Ok(())
553            }
554        }
555    }
556
557    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
558    fn repartitioned(
559        &self,
560        target_partitions: usize,
561        repartition_file_min_size: usize,
562        output_ordering: Option<LexOrdering>,
563    ) -> Result<Option<Arc<dyn DataSource>>> {
564        let source = self.file_source.repartitioned(
565            target_partitions,
566            repartition_file_min_size,
567            output_ordering,
568            self,
569        )?;
570
571        Ok(source.map(|s| Arc::new(s) as _))
572    }
573
574    fn output_partitioning(&self) -> Partitioning {
575        Partitioning::UnknownPartitioning(self.file_groups.len())
576    }
577
578    fn eq_properties(&self) -> EquivalenceProperties {
579        let (schema, constraints, _, orderings) = self.project();
580        EquivalenceProperties::new_with_orderings(schema, orderings)
581            .with_constraints(constraints)
582    }
583
584    fn scheduling_type(&self) -> SchedulingType {
585        SchedulingType::Cooperative
586    }
587
588    fn statistics(&self) -> Result<Statistics> {
589        Ok(self.projected_stats())
590    }
591
592    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
593        let source = FileScanConfigBuilder::from(self.clone())
594            .with_limit(limit)
595            .build();
596        Some(Arc::new(source))
597    }
598
599    fn fetch(&self) -> Option<usize> {
600        self.limit
601    }
602
603    fn metrics(&self) -> ExecutionPlanMetricsSet {
604        self.file_source.metrics().clone()
605    }
606
607    fn try_swapping_with_projection(
608        &self,
609        projection: &ProjectionExec,
610    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
611        // This process can be moved into CsvExec, but it would be an overlap of their responsibility.
612
613        // Must be all column references, with no table partition columns (which can not be projected)
614        let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
615            expr.as_any()
616                .downcast_ref::<Column>()
617                .map(|expr| expr.index() >= self.file_schema.fields().len())
618                .unwrap_or(false)
619        });
620
621        // If there is any non-column or alias-carrier expression, Projection should not be removed.
622        let no_aliases = all_alias_free_columns(projection.expr());
623
624        Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
625            let file_scan = self.clone();
626            let source = Arc::clone(&file_scan.file_source);
627            let new_projections = new_projections_for_columns(
628                projection,
629                &file_scan
630                    .projection
631                    .clone()
632                    .unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()),
633            );
634            DataSourceExec::from_data_source(
635                FileScanConfigBuilder::from(file_scan)
636                    // Assign projected statistics to source
637                    .with_projection(Some(new_projections))
638                    .with_source(source)
639                    .build(),
640            ) as _
641        }))
642    }
643
644    fn try_pushdown_filters(
645        &self,
646        filters: Vec<Arc<dyn PhysicalExpr>>,
647        config: &ConfigOptions,
648    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
649        let result = self.file_source.try_pushdown_filters(filters, config)?;
650        match result.updated_node {
651            Some(new_file_source) => {
652                let file_scan_config = FileScanConfigBuilder::from(self.clone())
653                    .with_source(new_file_source)
654                    .build();
655                Ok(FilterPushdownPropagation {
656                    filters: result.filters,
657                    updated_node: Some(Arc::new(file_scan_config) as _),
658                })
659            }
660            None => {
661                // If the file source does not support filter pushdown, return the original config
662                Ok(FilterPushdownPropagation {
663                    filters: result.filters,
664                    updated_node: None,
665                })
666            }
667        }
668    }
669}
670
671impl FileScanConfig {
672    /// Create a new [`FileScanConfig`] with default settings for scanning files.
673    ///
674    /// See example on [`FileScanConfig`]
675    ///
676    /// No file groups are added by default. See [`Self::with_file`], [`Self::with_file_group`] and
677    /// [`Self::with_file_groups`].
678    ///
679    /// # Parameters:
680    /// * `object_store_url`: See [`Self::object_store_url`]
681    /// * `file_schema`: See [`Self::file_schema`]
682    #[allow(deprecated)] // `new` will be removed same time as `with_source`
683    pub fn new(
684        object_store_url: ObjectStoreUrl,
685        file_schema: SchemaRef,
686        file_source: Arc<dyn FileSource>,
687    ) -> Self {
688        let statistics = Statistics::new_unknown(&file_schema);
689        let file_source = file_source
690            .with_statistics(statistics.clone())
691            .with_schema(Arc::clone(&file_schema));
692        Self {
693            object_store_url,
694            file_schema,
695            file_groups: vec![],
696            constraints: Constraints::default(),
697            projection: None,
698            limit: None,
699            table_partition_cols: vec![],
700            output_ordering: vec![],
701            file_compression_type: FileCompressionType::UNCOMPRESSED,
702            new_lines_in_values: false,
703            file_source: Arc::clone(&file_source),
704            batch_size: None,
705            expr_adapter_factory: None,
706        }
707    }
708
709    /// Set the file source
710    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
711    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
712        self.file_source =
713            file_source.with_statistics(Statistics::new_unknown(&self.file_schema));
714        self
715    }
716
717    /// Set the table constraints of the files
718    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
719    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
720        self.constraints = constraints;
721        self
722    }
723
724    /// Set the statistics of the files
725    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
726    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
727        self.file_source = self.file_source.with_statistics(statistics);
728        self
729    }
730
731    fn projection_indices(&self) -> Vec<usize> {
732        match &self.projection {
733            Some(proj) => proj.clone(),
734            None => (0..self.file_schema.fields().len()
735                + self.table_partition_cols.len())
736                .collect(),
737        }
738    }
739
740    pub fn projected_stats(&self) -> Statistics {
741        let statistics = self.file_source.statistics().unwrap();
742
743        let table_cols_stats = self
744            .projection_indices()
745            .into_iter()
746            .map(|idx| {
747                if idx < self.file_schema.fields().len() {
748                    statistics.column_statistics[idx].clone()
749                } else {
750                    // TODO provide accurate stat for partition column (#1186)
751                    ColumnStatistics::new_unknown()
752                }
753            })
754            .collect();
755
756        Statistics {
757            num_rows: statistics.num_rows,
758            // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
759            total_byte_size: statistics.total_byte_size,
760            column_statistics: table_cols_stats,
761        }
762    }
763
764    pub fn projected_schema(&self) -> Arc<Schema> {
765        let table_fields: Vec<_> = self
766            .projection_indices()
767            .into_iter()
768            .map(|idx| {
769                if idx < self.file_schema.fields().len() {
770                    self.file_schema.field(idx).clone()
771                } else {
772                    let partition_idx = idx - self.file_schema.fields().len();
773                    Arc::unwrap_or_clone(Arc::clone(
774                        &self.table_partition_cols[partition_idx],
775                    ))
776                }
777            })
778            .collect();
779
780        Arc::new(Schema::new_with_metadata(
781            table_fields,
782            self.file_schema.metadata().clone(),
783        ))
784    }
785
786    pub fn projected_constraints(&self) -> Constraints {
787        let indexes = self.projection_indices();
788        self.constraints.project(&indexes).unwrap_or_default()
789    }
790
791    /// Set the projection of the files
792    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
793    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
794        self.projection = projection;
795        self
796    }
797
798    /// Set the limit of the files
799    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
800    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
801        self.limit = limit;
802        self
803    }
804
805    /// Add a file as a single group
806    ///
807    /// See [Self::file_groups] for more information.
808    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
809    #[allow(deprecated)]
810    pub fn with_file(self, file: PartitionedFile) -> Self {
811        self.with_file_group(FileGroup::new(vec![file]))
812    }
813
814    /// Add the file groups
815    ///
816    /// See [Self::file_groups] for more information.
817    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
818    pub fn with_file_groups(mut self, mut file_groups: Vec<FileGroup>) -> Self {
819        self.file_groups.append(&mut file_groups);
820        self
821    }
822
823    /// Add a new file group
824    ///
825    /// See [Self::file_groups] for more information
826    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
827    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
828        self.file_groups.push(file_group);
829        self
830    }
831
832    /// Set the partitioning columns of the files
833    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
834    pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
835        self.table_partition_cols = table_partition_cols
836            .into_iter()
837            .map(|f| Arc::new(f) as FieldRef)
838            .collect();
839        self
840    }
841
842    /// Set the output ordering of the files
843    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
844    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
845        self.output_ordering = output_ordering;
846        self
847    }
848
849    /// Set the file compression type
850    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
851    pub fn with_file_compression_type(
852        mut self,
853        file_compression_type: FileCompressionType,
854    ) -> Self {
855        self.file_compression_type = file_compression_type;
856        self
857    }
858
859    /// Set the new_lines_in_values property
860    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
861    pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
862        self.new_lines_in_values = new_lines_in_values;
863        self
864    }
865
866    /// Set the batch_size property
867    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
868    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
869        self.batch_size = batch_size;
870        self
871    }
872
873    /// Specifies whether newlines in (quoted) values are supported.
874    ///
875    /// Parsing newlines in quoted values may be affected by execution behaviour such as
876    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
877    /// parsed successfully, which may reduce performance.
878    ///
879    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
880    pub fn newlines_in_values(&self) -> bool {
881        self.new_lines_in_values
882    }
883
884    /// Project the schema, constraints, and the statistics on the given column indices
885    pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
886        if self.projection.is_none() && self.table_partition_cols.is_empty() {
887            return (
888                Arc::clone(&self.file_schema),
889                self.constraints.clone(),
890                self.file_source.statistics().unwrap().clone(),
891                self.output_ordering.clone(),
892            );
893        }
894
895        let schema = self.projected_schema();
896        let constraints = self.projected_constraints();
897        let stats = self.projected_stats();
898
899        let output_ordering = get_projected_output_ordering(self, &schema);
900
901        (schema, constraints, stats, output_ordering)
902    }
903
904    pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
905        self.projection.as_ref().map(|p| {
906            p.iter()
907                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
908                .map(|col_idx| self.file_schema.field(*col_idx).name())
909                .cloned()
910                .collect()
911        })
912    }
913
914    /// Projects only file schema, ignoring partition columns
915    pub fn projected_file_schema(&self) -> SchemaRef {
916        let fields = self.file_column_projection_indices().map(|indices| {
917            indices
918                .iter()
919                .map(|col_idx| self.file_schema.field(*col_idx))
920                .cloned()
921                .collect::<Vec<_>>()
922        });
923
924        fields.map_or_else(
925            || Arc::clone(&self.file_schema),
926            |f| {
927                Arc::new(Schema::new_with_metadata(
928                    f,
929                    self.file_schema.metadata.clone(),
930                ))
931            },
932        )
933    }
934
935    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
936        self.projection.as_ref().map(|p| {
937            p.iter()
938                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
939                .copied()
940                .collect()
941        })
942    }
943
944    /// Splits file groups into new groups based on statistics to enable efficient parallel processing.
945    ///
946    /// The method distributes files across a target number of partitions while ensuring
947    /// files within each partition maintain sort order based on their min/max statistics.
948    ///
949    /// The algorithm works by:
950    /// 1. Takes files sorted by minimum values
951    /// 2. For each file:
952    ///   - Finds eligible groups (empty or where file's min > group's last max)
953    ///   - Selects the smallest eligible group
954    ///   - Creates a new group if needed
955    ///
956    /// # Parameters
957    /// * `table_schema`: Schema containing information about the columns
958    /// * `file_groups`: The original file groups to split
959    /// * `sort_order`: The lexicographical ordering to maintain within each group
960    /// * `target_partitions`: The desired number of output partitions
961    ///
962    /// # Returns
963    /// A new set of file groups, where files within each group are non-overlapping with respect to
964    /// their min/max statistics and maintain the specified sort order.
965    pub fn split_groups_by_statistics_with_target_partitions(
966        table_schema: &SchemaRef,
967        file_groups: &[FileGroup],
968        sort_order: &LexOrdering,
969        target_partitions: usize,
970    ) -> Result<Vec<FileGroup>> {
971        if target_partitions == 0 {
972            return Err(DataFusionError::Internal(
973                "target_partitions must be greater than 0".to_string(),
974            ));
975        }
976
977        let flattened_files = file_groups
978            .iter()
979            .flat_map(FileGroup::iter)
980            .collect::<Vec<_>>();
981
982        if flattened_files.is_empty() {
983            return Ok(vec![]);
984        }
985
986        let statistics = MinMaxStatistics::new_from_files(
987            sort_order,
988            table_schema,
989            None,
990            flattened_files.iter().copied(),
991        )?;
992
993        let indices_sorted_by_min = statistics.min_values_sorted();
994
995        // Initialize with target_partitions empty groups
996        let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
997
998        for (idx, min) in indices_sorted_by_min {
999            if let Some((_, group)) = file_groups_indices
1000                .iter_mut()
1001                .enumerate()
1002                .filter(|(_, group)| {
1003                    group.is_empty()
1004                        || min
1005                            > statistics
1006                                .max(*group.last().expect("groups should not be empty"))
1007                })
1008                .min_by_key(|(_, group)| group.len())
1009            {
1010                group.push(idx);
1011            } else {
1012                // Create a new group if no existing group fits
1013                file_groups_indices.push(vec![idx]);
1014            }
1015        }
1016
1017        // Remove any empty groups
1018        file_groups_indices.retain(|group| !group.is_empty());
1019
1020        // Assemble indices back into groups of PartitionedFiles
1021        Ok(file_groups_indices
1022            .into_iter()
1023            .map(|file_group_indices| {
1024                FileGroup::new(
1025                    file_group_indices
1026                        .into_iter()
1027                        .map(|idx| flattened_files[idx].clone())
1028                        .collect(),
1029                )
1030            })
1031            .collect())
1032    }
1033
1034    /// Attempts to do a bin-packing on files into file groups, such that any two files
1035    /// in a file group are ordered and non-overlapping with respect to their statistics.
1036    /// It will produce the smallest number of file groups possible.
1037    pub fn split_groups_by_statistics(
1038        table_schema: &SchemaRef,
1039        file_groups: &[FileGroup],
1040        sort_order: &LexOrdering,
1041    ) -> Result<Vec<FileGroup>> {
1042        let flattened_files = file_groups
1043            .iter()
1044            .flat_map(FileGroup::iter)
1045            .collect::<Vec<_>>();
1046        // First Fit:
1047        // * Choose the first file group that a file can be placed into.
1048        // * If it fits into no existing file groups, create a new one.
1049        //
1050        // By sorting files by min values and then applying first-fit bin packing,
1051        // we can produce the smallest number of file groups such that
1052        // files within a group are in order and non-overlapping.
1053        //
1054        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
1055        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
1056
1057        if flattened_files.is_empty() {
1058            return Ok(vec![]);
1059        }
1060
1061        let statistics = MinMaxStatistics::new_from_files(
1062            sort_order,
1063            table_schema,
1064            None,
1065            flattened_files.iter().copied(),
1066        )
1067        .map_err(|e| {
1068            e.context("construct min/max statistics for split_groups_by_statistics")
1069        })?;
1070
1071        let indices_sorted_by_min = statistics.min_values_sorted();
1072        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1073
1074        for (idx, min) in indices_sorted_by_min {
1075            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1076                // If our file is non-overlapping and comes _after_ the last file,
1077                // it fits in this file group.
1078                min > statistics.max(
1079                    *group
1080                        .last()
1081                        .expect("groups should be nonempty at construction"),
1082                )
1083            });
1084            match file_group_to_insert {
1085                Some(group) => group.push(idx),
1086                None => file_groups_indices.push(vec![idx]),
1087            }
1088        }
1089
1090        // Assemble indices back into groups of PartitionedFiles
1091        Ok(file_groups_indices
1092            .into_iter()
1093            .map(|file_group_indices| {
1094                file_group_indices
1095                    .into_iter()
1096                    .map(|idx| flattened_files[idx].clone())
1097                    .collect()
1098            })
1099            .collect())
1100    }
1101
1102    /// Returns a new [`DataSourceExec`] to scan the files specified by this config
1103    #[deprecated(since = "47.0.0", note = "use DataSourceExec::new instead")]
1104    pub fn build(self) -> Arc<DataSourceExec> {
1105        DataSourceExec::from_data_source(self)
1106    }
1107
1108    /// Write the data_type based on file_source
1109    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1110        write!(f, ", file_type={}", self.file_source.file_type())?;
1111        self.file_source.fmt_extra(t, f)
1112    }
1113
1114    /// Returns the file_source
1115    pub fn file_source(&self) -> &Arc<dyn FileSource> {
1116        &self.file_source
1117    }
1118}
1119
1120impl Debug for FileScanConfig {
1121    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1122        write!(f, "FileScanConfig {{")?;
1123        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1124
1125        write!(
1126            f,
1127            "statistics={:?}, ",
1128            self.file_source.statistics().unwrap()
1129        )?;
1130
1131        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1132        write!(f, "}}")
1133    }
1134}
1135
1136impl DisplayAs for FileScanConfig {
1137    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1138        let schema = self.projected_schema();
1139        let orderings = get_projected_output_ordering(self, &schema);
1140
1141        write!(f, "file_groups=")?;
1142        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1143
1144        if !schema.fields().is_empty() {
1145            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1146        }
1147
1148        if let Some(limit) = self.limit {
1149            write!(f, ", limit={limit}")?;
1150        }
1151
1152        display_orderings(f, &orderings)?;
1153
1154        if !self.constraints.is_empty() {
1155            write!(f, ", {}", self.constraints)?;
1156        }
1157
1158        Ok(())
1159    }
1160}
1161
1162/// A helper that projects partition columns into the file record batches.
1163///
1164/// One interesting trick is the usage of a cache for the key buffers of the partition column
1165/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
1166/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
1167/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
1168pub struct PartitionColumnProjector {
1169    /// An Arrow buffer initialized to zeros that represents the key array of all partition
1170    /// columns (partition columns are materialized by dictionary arrays with only one
1171    /// value in the dictionary, thus all the keys are equal to zero).
1172    key_buffer_cache: ZeroBufferGenerators,
1173    /// Mapping between the indexes in the list of partition columns and the target
1174    /// schema. Sorted by index in the target schema so that we can iterate on it to
1175    /// insert the partition columns in the target record batch.
1176    projected_partition_indexes: Vec<(usize, usize)>,
1177    /// The schema of the table once the projection was applied.
1178    projected_schema: SchemaRef,
1179}
1180
1181impl PartitionColumnProjector {
1182    // Create a projector to insert the partitioning columns into batches read from files
1183    // - `projected_schema`: the target schema with both file and partitioning columns
1184    // - `table_partition_cols`: all the partitioning column names
1185    pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1186        let mut idx_map = HashMap::new();
1187        for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1188            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1189                idx_map.insert(partition_idx, schema_idx);
1190            }
1191        }
1192
1193        let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1194        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1195
1196        Self {
1197            projected_partition_indexes,
1198            key_buffer_cache: Default::default(),
1199            projected_schema,
1200        }
1201    }
1202
1203    // Transform the batch read from the file by inserting the partitioning columns
1204    // to the right positions as deduced from `projected_schema`
1205    // - `file_batch`: batch read from the file, with internal projection applied
1206    // - `partition_values`: the list of partition values, one for each partition column
1207    pub fn project(
1208        &mut self,
1209        file_batch: RecordBatch,
1210        partition_values: &[ScalarValue],
1211    ) -> Result<RecordBatch> {
1212        let expected_cols =
1213            self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1214
1215        if file_batch.columns().len() != expected_cols {
1216            return exec_err!(
1217                "Unexpected batch schema from file, expected {} cols but got {}",
1218                expected_cols,
1219                file_batch.columns().len()
1220            );
1221        }
1222
1223        let mut cols = file_batch.columns().to_vec();
1224        for &(pidx, sidx) in &self.projected_partition_indexes {
1225            let p_value =
1226                partition_values
1227                    .get(pidx)
1228                    .ok_or(DataFusionError::Execution(
1229                        "Invalid partitioning found on disk".to_string(),
1230                    ))?;
1231
1232            let mut partition_value = Cow::Borrowed(p_value);
1233
1234            // check if user forgot to dict-encode the partition value
1235            let field = self.projected_schema.field(sidx);
1236            let expected_data_type = field.data_type();
1237            let actual_data_type = partition_value.data_type();
1238            if let DataType::Dictionary(key_type, _) = expected_data_type {
1239                if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1240                    warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1241                    partition_value = Cow::Owned(ScalarValue::Dictionary(
1242                        key_type.clone(),
1243                        Box::new(partition_value.as_ref().clone()),
1244                    ));
1245                }
1246            }
1247
1248            cols.insert(
1249                sidx,
1250                create_output_array(
1251                    &mut self.key_buffer_cache,
1252                    partition_value.as_ref(),
1253                    file_batch.num_rows(),
1254                )?,
1255            )
1256        }
1257
1258        RecordBatch::try_new_with_options(
1259            Arc::clone(&self.projected_schema),
1260            cols,
1261            &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1262        )
1263        .map_err(Into::into)
1264    }
1265}
1266
1267#[derive(Debug, Default)]
1268struct ZeroBufferGenerators {
1269    gen_i8: ZeroBufferGenerator<i8>,
1270    gen_i16: ZeroBufferGenerator<i16>,
1271    gen_i32: ZeroBufferGenerator<i32>,
1272    gen_i64: ZeroBufferGenerator<i64>,
1273    gen_u8: ZeroBufferGenerator<u8>,
1274    gen_u16: ZeroBufferGenerator<u16>,
1275    gen_u32: ZeroBufferGenerator<u32>,
1276    gen_u64: ZeroBufferGenerator<u64>,
1277}
1278
1279/// Generate a arrow [`Buffer`] that contains zero values.
1280#[derive(Debug, Default)]
1281struct ZeroBufferGenerator<T>
1282where
1283    T: ArrowNativeType,
1284{
1285    cache: Option<Buffer>,
1286    _t: PhantomData<T>,
1287}
1288
1289impl<T> ZeroBufferGenerator<T>
1290where
1291    T: ArrowNativeType,
1292{
1293    const SIZE: usize = size_of::<T>();
1294
1295    fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1296        match &mut self.cache {
1297            Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1298                buf.slice_with_length(0, n_vals * Self::SIZE)
1299            }
1300            _ => {
1301                let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1302                key_buffer_builder.advance(n_vals); // keys are all 0
1303                self.cache.insert(key_buffer_builder.finish()).clone()
1304            }
1305        }
1306    }
1307}
1308
1309fn create_dict_array<T>(
1310    buffer_gen: &mut ZeroBufferGenerator<T>,
1311    dict_val: &ScalarValue,
1312    len: usize,
1313    data_type: DataType,
1314) -> Result<ArrayRef>
1315where
1316    T: ArrowNativeType,
1317{
1318    let dict_vals = dict_val.to_array()?;
1319
1320    let sliced_key_buffer = buffer_gen.get_buffer(len);
1321
1322    // assemble pieces together
1323    let mut builder = ArrayData::builder(data_type)
1324        .len(len)
1325        .add_buffer(sliced_key_buffer);
1326    builder = builder.add_child_data(dict_vals.to_data());
1327    Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1328        builder.build().unwrap(),
1329    )))
1330}
1331
1332fn create_output_array(
1333    key_buffer_cache: &mut ZeroBufferGenerators,
1334    val: &ScalarValue,
1335    len: usize,
1336) -> Result<ArrayRef> {
1337    if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1338        match key_type.as_ref() {
1339            DataType::Int8 => {
1340                return create_dict_array(
1341                    &mut key_buffer_cache.gen_i8,
1342                    dict_val,
1343                    len,
1344                    val.data_type(),
1345                );
1346            }
1347            DataType::Int16 => {
1348                return create_dict_array(
1349                    &mut key_buffer_cache.gen_i16,
1350                    dict_val,
1351                    len,
1352                    val.data_type(),
1353                );
1354            }
1355            DataType::Int32 => {
1356                return create_dict_array(
1357                    &mut key_buffer_cache.gen_i32,
1358                    dict_val,
1359                    len,
1360                    val.data_type(),
1361                );
1362            }
1363            DataType::Int64 => {
1364                return create_dict_array(
1365                    &mut key_buffer_cache.gen_i64,
1366                    dict_val,
1367                    len,
1368                    val.data_type(),
1369                );
1370            }
1371            DataType::UInt8 => {
1372                return create_dict_array(
1373                    &mut key_buffer_cache.gen_u8,
1374                    dict_val,
1375                    len,
1376                    val.data_type(),
1377                );
1378            }
1379            DataType::UInt16 => {
1380                return create_dict_array(
1381                    &mut key_buffer_cache.gen_u16,
1382                    dict_val,
1383                    len,
1384                    val.data_type(),
1385                );
1386            }
1387            DataType::UInt32 => {
1388                return create_dict_array(
1389                    &mut key_buffer_cache.gen_u32,
1390                    dict_val,
1391                    len,
1392                    val.data_type(),
1393                );
1394            }
1395            DataType::UInt64 => {
1396                return create_dict_array(
1397                    &mut key_buffer_cache.gen_u64,
1398                    dict_val,
1399                    len,
1400                    val.data_type(),
1401                );
1402            }
1403            _ => {}
1404        }
1405    }
1406
1407    val.to_array_of_size(len)
1408}
1409
1410/// The various listing tables does not attempt to read all files
1411/// concurrently, instead they will read files in sequence within a
1412/// partition.  This is an important property as it allows plans to
1413/// run against 1000s of files and not try to open them all
1414/// concurrently.
1415///
1416/// However, it means if we assign more than one file to a partition
1417/// the output sort order will not be preserved as illustrated in the
1418/// following diagrams:
1419///
1420/// When only 1 file is assigned to each partition, each partition is
1421/// correctly sorted on `(A, B, C)`
1422///
1423/// ```text
1424///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
1425///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
1426///┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
1427///  │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
1428///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
1429///  │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
1430///┃                                          │                    │                     ┃
1431///  │                   │ │                    │                    │                 │
1432///┃                                          │                    │                     ┃
1433///  │                   │ │                    │                    │                 │
1434///┃                                          │                    │                     ┃
1435///  │                   │ │                    │                    │                 │
1436///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1437///     DataFusion           DataFusion           DataFusion           DataFusion
1438///┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
1439/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1440///
1441///                                      DataSourceExec
1442///```
1443///
1444/// However, when more than 1 file is assigned to each partition, each
1445/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
1446/// file is scanned, the same values for A, B and C can be repeated in
1447/// the same sorted stream
1448///
1449///```text
1450///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1451///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1452///┃   ┌───────────────┐     ┌──────────────┐ │
1453///  │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
1454///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1455///  │ └───────────────┘ │ │ └──────────────┘   ┃
1456///┃   ┌───────────────┐     ┌──────────────┐ │
1457///  │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
1458///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1459///  │ └───────────────┘ │ │ └──────────────┘   ┃
1460///┃                                          │
1461///  │                   │ │                    ┃
1462///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
1463///     DataFusion           DataFusion         ┃
1464///┃    Partition 1          Partition 2
1465/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
1466///
1467///              DataSourceExec
1468///```
1469fn get_projected_output_ordering(
1470    base_config: &FileScanConfig,
1471    projected_schema: &SchemaRef,
1472) -> Vec<LexOrdering> {
1473    let mut all_orderings = vec![];
1474    for output_ordering in &base_config.output_ordering {
1475        let mut new_ordering = vec![];
1476        for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1477            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1478                let name = col.name();
1479                if let Some((idx, _)) = projected_schema.column_with_name(name) {
1480                    // Compute the new sort expression (with correct index) after projection:
1481                    new_ordering.push(PhysicalSortExpr::new(
1482                        Arc::new(Column::new(name, idx)),
1483                        *options,
1484                    ));
1485                    continue;
1486                }
1487            }
1488            // Cannot find expression in the projected_schema, stop iterating
1489            // since rest of the orderings are violated
1490            break;
1491        }
1492
1493        let Some(new_ordering) = LexOrdering::new(new_ordering) else {
1494            continue;
1495        };
1496
1497        // Check if any file groups are not sorted
1498        if base_config.file_groups.iter().any(|group| {
1499            if group.len() <= 1 {
1500                // File groups with <= 1 files are always sorted
1501                return false;
1502            }
1503
1504            let statistics = match MinMaxStatistics::new_from_files(
1505                &new_ordering,
1506                projected_schema,
1507                base_config.projection.as_deref(),
1508                group.iter(),
1509            ) {
1510                Ok(statistics) => statistics,
1511                Err(e) => {
1512                    log::trace!("Error fetching statistics for file group: {e}");
1513                    // we can't prove that it's ordered, so we have to reject it
1514                    return true;
1515                }
1516            };
1517
1518            !statistics.is_sorted()
1519        }) {
1520            debug!(
1521                "Skipping specified output ordering {:?}. \
1522                Some file groups couldn't be determined to be sorted: {:?}",
1523                base_config.output_ordering[0], base_config.file_groups
1524            );
1525            continue;
1526        }
1527
1528        all_orderings.push(new_ordering);
1529    }
1530    all_orderings
1531}
1532
1533/// Convert type to a type suitable for use as a `ListingTable`
1534/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1535/// a reasonable trade off between a reasonable number of partition
1536/// values and space efficiency.
1537///
1538/// This use this to specify types for partition columns. However
1539/// you MAY also choose not to dictionary-encode the data or to use a
1540/// different dictionary type.
1541///
1542/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1543pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1544    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1545}
1546
1547/// Convert a [`ScalarValue`] of partition columns to a type, as
1548/// described in the documentation of [`wrap_partition_type_in_dict`],
1549/// which can wrap the types.
1550pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1551    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1552}
1553
1554#[cfg(test)]
1555mod tests {
1556    use super::*;
1557    use crate::{
1558        generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1559        verify_sort_integrity,
1560    };
1561
1562    use arrow::array::{Int32Array, RecordBatch};
1563    use datafusion_common::stats::Precision;
1564    use datafusion_common::{assert_batches_eq, internal_err};
1565    use datafusion_expr::SortExpr;
1566    use datafusion_physical_expr::create_physical_sort_expr;
1567
1568    /// Returns the column names on the schema
1569    pub fn columns(schema: &Schema) -> Vec<String> {
1570        schema.fields().iter().map(|f| f.name().clone()).collect()
1571    }
1572
1573    #[test]
1574    fn physical_plan_config_no_projection() {
1575        let file_schema = aggr_test_schema();
1576        let conf = config_for_projection(
1577            Arc::clone(&file_schema),
1578            None,
1579            Statistics::new_unknown(&file_schema),
1580            to_partition_cols(vec![(
1581                "date".to_owned(),
1582                wrap_partition_type_in_dict(DataType::Utf8),
1583            )]),
1584        );
1585
1586        let (proj_schema, _, proj_statistics, _) = conf.project();
1587        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1588        assert_eq!(
1589            proj_schema.field(file_schema.fields().len()).name(),
1590            "date",
1591            "partition columns are the last columns"
1592        );
1593        assert_eq!(
1594            proj_statistics.column_statistics.len(),
1595            file_schema.fields().len() + 1
1596        );
1597        // TODO implement tests for partition column statistics once implemented
1598
1599        let col_names = conf.projected_file_column_names();
1600        assert_eq!(col_names, None);
1601
1602        let col_indices = conf.file_column_projection_indices();
1603        assert_eq!(col_indices, None);
1604    }
1605
1606    #[test]
1607    fn physical_plan_config_no_projection_tab_cols_as_field() {
1608        let file_schema = aggr_test_schema();
1609
1610        // make a table_partition_col as a field
1611        let table_partition_col =
1612            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1613                .with_metadata(HashMap::from_iter(vec![(
1614                    "key_whatever".to_owned(),
1615                    "value_whatever".to_owned(),
1616                )]));
1617
1618        let conf = config_for_projection(
1619            Arc::clone(&file_schema),
1620            None,
1621            Statistics::new_unknown(&file_schema),
1622            vec![table_partition_col.clone()],
1623        );
1624
1625        // verify the proj_schema includes the last column and exactly the same the field it is defined
1626        let proj_schema = conf.projected_schema();
1627        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1628        assert_eq!(
1629            *proj_schema.field(file_schema.fields().len()),
1630            table_partition_col,
1631            "partition columns are the last columns and ust have all values defined in created field"
1632        );
1633    }
1634
1635    #[test]
1636    fn physical_plan_config_with_projection() {
1637        let file_schema = aggr_test_schema();
1638        let conf = config_for_projection(
1639            Arc::clone(&file_schema),
1640            Some(vec![file_schema.fields().len(), 0]),
1641            Statistics {
1642                num_rows: Precision::Inexact(10),
1643                // assign the column index to distinct_count to help assert
1644                // the source statistic after the projection
1645                column_statistics: (0..file_schema.fields().len())
1646                    .map(|i| ColumnStatistics {
1647                        distinct_count: Precision::Inexact(i),
1648                        ..Default::default()
1649                    })
1650                    .collect(),
1651                total_byte_size: Precision::Absent,
1652            },
1653            to_partition_cols(vec![(
1654                "date".to_owned(),
1655                wrap_partition_type_in_dict(DataType::Utf8),
1656            )]),
1657        );
1658
1659        let (proj_schema, _, proj_statistics, _) = conf.project();
1660        assert_eq!(
1661            columns(&proj_schema),
1662            vec!["date".to_owned(), "c1".to_owned()]
1663        );
1664        let proj_stat_cols = proj_statistics.column_statistics;
1665        assert_eq!(proj_stat_cols.len(), 2);
1666        // TODO implement tests for proj_stat_cols[0] once partition column
1667        // statistics are implemented
1668        assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1669
1670        let col_names = conf.projected_file_column_names();
1671        assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1672
1673        let col_indices = conf.file_column_projection_indices();
1674        assert_eq!(col_indices, Some(vec![0]));
1675    }
1676
1677    #[test]
1678    fn partition_column_projector() {
1679        let file_batch = build_table_i32(
1680            ("a", &vec![0, 1, 2]),
1681            ("b", &vec![-2, -1, 0]),
1682            ("c", &vec![10, 11, 12]),
1683        );
1684        let partition_cols = vec![
1685            (
1686                "year".to_owned(),
1687                wrap_partition_type_in_dict(DataType::Utf8),
1688            ),
1689            (
1690                "month".to_owned(),
1691                wrap_partition_type_in_dict(DataType::Utf8),
1692            ),
1693            (
1694                "day".to_owned(),
1695                wrap_partition_type_in_dict(DataType::Utf8),
1696            ),
1697        ];
1698        // create a projected schema
1699        let statistics = Statistics {
1700            num_rows: Precision::Inexact(3),
1701            total_byte_size: Precision::Absent,
1702            column_statistics: Statistics::unknown_column(&file_batch.schema()),
1703        };
1704
1705        let conf = config_for_projection(
1706            file_batch.schema(),
1707            // keep all cols from file and 2 from partitioning
1708            Some(vec![
1709                0,
1710                1,
1711                2,
1712                file_batch.schema().fields().len(),
1713                file_batch.schema().fields().len() + 2,
1714            ]),
1715            statistics.clone(),
1716            to_partition_cols(partition_cols.clone()),
1717        );
1718
1719        let source_statistics = conf.file_source.statistics().unwrap();
1720        let conf_stats = conf.statistics().unwrap();
1721
1722        // projection should be reflected in the file source statistics
1723        assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1724
1725        // 3 original statistics + 2 partition statistics
1726        assert_eq!(conf_stats.column_statistics.len(), 5);
1727
1728        // file statics should not be modified
1729        assert_eq!(source_statistics, statistics);
1730        assert_eq!(source_statistics.column_statistics.len(), 3);
1731
1732        let proj_schema = conf.projected_schema();
1733        // created a projector for that projected schema
1734        let mut proj = PartitionColumnProjector::new(
1735            proj_schema,
1736            &partition_cols
1737                .iter()
1738                .map(|x| x.0.clone())
1739                .collect::<Vec<_>>(),
1740        );
1741
1742        // project first batch
1743        let projected_batch = proj
1744            .project(
1745                // file_batch is ok here because we kept all the file cols in the projection
1746                file_batch,
1747                &[
1748                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1749                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1750                    wrap_partition_value_in_dict(ScalarValue::from("26")),
1751                ],
1752            )
1753            .expect("Projection of partition columns into record batch failed");
1754        let expected = [
1755            "+---+----+----+------+-----+",
1756            "| a | b  | c  | year | day |",
1757            "+---+----+----+------+-----+",
1758            "| 0 | -2 | 10 | 2021 | 26  |",
1759            "| 1 | -1 | 11 | 2021 | 26  |",
1760            "| 2 | 0  | 12 | 2021 | 26  |",
1761            "+---+----+----+------+-----+",
1762        ];
1763        assert_batches_eq!(expected, &[projected_batch]);
1764
1765        // project another batch that is larger than the previous one
1766        let file_batch = build_table_i32(
1767            ("a", &vec![5, 6, 7, 8, 9]),
1768            ("b", &vec![-10, -9, -8, -7, -6]),
1769            ("c", &vec![12, 13, 14, 15, 16]),
1770        );
1771        let projected_batch = proj
1772            .project(
1773                // file_batch is ok here because we kept all the file cols in the projection
1774                file_batch,
1775                &[
1776                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1777                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1778                    wrap_partition_value_in_dict(ScalarValue::from("27")),
1779                ],
1780            )
1781            .expect("Projection of partition columns into record batch failed");
1782        let expected = [
1783            "+---+-----+----+------+-----+",
1784            "| a | b   | c  | year | day |",
1785            "+---+-----+----+------+-----+",
1786            "| 5 | -10 | 12 | 2021 | 27  |",
1787            "| 6 | -9  | 13 | 2021 | 27  |",
1788            "| 7 | -8  | 14 | 2021 | 27  |",
1789            "| 8 | -7  | 15 | 2021 | 27  |",
1790            "| 9 | -6  | 16 | 2021 | 27  |",
1791            "+---+-----+----+------+-----+",
1792        ];
1793        assert_batches_eq!(expected, &[projected_batch]);
1794
1795        // project another batch that is smaller than the previous one
1796        let file_batch = build_table_i32(
1797            ("a", &vec![0, 1, 3]),
1798            ("b", &vec![2, 3, 4]),
1799            ("c", &vec![4, 5, 6]),
1800        );
1801        let projected_batch = proj
1802            .project(
1803                // file_batch is ok here because we kept all the file cols in the projection
1804                file_batch,
1805                &[
1806                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1807                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1808                    wrap_partition_value_in_dict(ScalarValue::from("28")),
1809                ],
1810            )
1811            .expect("Projection of partition columns into record batch failed");
1812        let expected = [
1813            "+---+---+---+------+-----+",
1814            "| a | b | c | year | day |",
1815            "+---+---+---+------+-----+",
1816            "| 0 | 2 | 4 | 2021 | 28  |",
1817            "| 1 | 3 | 5 | 2021 | 28  |",
1818            "| 3 | 4 | 6 | 2021 | 28  |",
1819            "+---+---+---+------+-----+",
1820        ];
1821        assert_batches_eq!(expected, &[projected_batch]);
1822
1823        // forgot to dictionary-wrap the scalar value
1824        let file_batch = build_table_i32(
1825            ("a", &vec![0, 1, 2]),
1826            ("b", &vec![-2, -1, 0]),
1827            ("c", &vec![10, 11, 12]),
1828        );
1829        let projected_batch = proj
1830            .project(
1831                // file_batch is ok here because we kept all the file cols in the projection
1832                file_batch,
1833                &[
1834                    ScalarValue::from("2021"),
1835                    ScalarValue::from("10"),
1836                    ScalarValue::from("26"),
1837                ],
1838            )
1839            .expect("Projection of partition columns into record batch failed");
1840        let expected = [
1841            "+---+----+----+------+-----+",
1842            "| a | b  | c  | year | day |",
1843            "+---+----+----+------+-----+",
1844            "| 0 | -2 | 10 | 2021 | 26  |",
1845            "| 1 | -1 | 11 | 2021 | 26  |",
1846            "| 2 | 0  | 12 | 2021 | 26  |",
1847            "+---+----+----+------+-----+",
1848        ];
1849        assert_batches_eq!(expected, &[projected_batch]);
1850    }
1851
1852    #[test]
1853    fn test_projected_file_schema_with_partition_col() {
1854        let schema = aggr_test_schema();
1855        let partition_cols = vec![
1856            (
1857                "part1".to_owned(),
1858                wrap_partition_type_in_dict(DataType::Utf8),
1859            ),
1860            (
1861                "part2".to_owned(),
1862                wrap_partition_type_in_dict(DataType::Utf8),
1863            ),
1864        ];
1865
1866        // Projected file schema for config with projection including partition column
1867        let projection = config_for_projection(
1868            schema.clone(),
1869            Some(vec![0, 3, 5, schema.fields().len()]),
1870            Statistics::new_unknown(&schema),
1871            to_partition_cols(partition_cols),
1872        )
1873        .projected_file_schema();
1874
1875        // Assert partition column filtered out in projected file schema
1876        let expected_columns = vec!["c1", "c4", "c6"];
1877        let actual_columns = projection
1878            .fields()
1879            .iter()
1880            .map(|f| f.name().clone())
1881            .collect::<Vec<_>>();
1882        assert_eq!(expected_columns, actual_columns);
1883    }
1884
1885    #[test]
1886    fn test_projected_file_schema_without_projection() {
1887        let schema = aggr_test_schema();
1888        let partition_cols = vec![
1889            (
1890                "part1".to_owned(),
1891                wrap_partition_type_in_dict(DataType::Utf8),
1892            ),
1893            (
1894                "part2".to_owned(),
1895                wrap_partition_type_in_dict(DataType::Utf8),
1896            ),
1897        ];
1898
1899        // Projected file schema for config without projection
1900        let projection = config_for_projection(
1901            schema.clone(),
1902            None,
1903            Statistics::new_unknown(&schema),
1904            to_partition_cols(partition_cols),
1905        )
1906        .projected_file_schema();
1907
1908        // Assert projected file schema is equal to file schema
1909        assert_eq!(projection.fields(), schema.fields());
1910    }
1911
1912    #[test]
1913    fn test_split_groups_by_statistics() -> Result<()> {
1914        use chrono::TimeZone;
1915        use datafusion_common::DFSchema;
1916        use datafusion_expr::execution_props::ExecutionProps;
1917        use object_store::{path::Path, ObjectMeta};
1918
1919        struct File {
1920            name: &'static str,
1921            date: &'static str,
1922            statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1923        }
1924        impl File {
1925            fn new(
1926                name: &'static str,
1927                date: &'static str,
1928                statistics: Vec<Option<(f64, f64)>>,
1929            ) -> Self {
1930                Self::new_nullable(
1931                    name,
1932                    date,
1933                    statistics
1934                        .into_iter()
1935                        .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1936                        .collect(),
1937                )
1938            }
1939
1940            fn new_nullable(
1941                name: &'static str,
1942                date: &'static str,
1943                statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1944            ) -> Self {
1945                Self {
1946                    name,
1947                    date,
1948                    statistics,
1949                }
1950            }
1951        }
1952
1953        struct TestCase {
1954            name: &'static str,
1955            file_schema: Schema,
1956            files: Vec<File>,
1957            sort: Vec<SortExpr>,
1958            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1959        }
1960
1961        use datafusion_expr::col;
1962        let cases = vec![
1963            TestCase {
1964                name: "test sort",
1965                file_schema: Schema::new(vec![Field::new(
1966                    "value".to_string(),
1967                    DataType::Float64,
1968                    false,
1969                )]),
1970                files: vec![
1971                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1972                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1973                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1974                ],
1975                sort: vec![col("value").sort(true, false)],
1976                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1977            },
1978            // same input but file '2' is in the middle
1979            // test that we still order correctly
1980            TestCase {
1981                name: "test sort with files ordered differently",
1982                file_schema: Schema::new(vec![Field::new(
1983                    "value".to_string(),
1984                    DataType::Float64,
1985                    false,
1986                )]),
1987                files: vec![
1988                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1989                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1990                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1991                ],
1992                sort: vec![col("value").sort(true, false)],
1993                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1994            },
1995            TestCase {
1996                name: "reverse sort",
1997                file_schema: Schema::new(vec![Field::new(
1998                    "value".to_string(),
1999                    DataType::Float64,
2000                    false,
2001                )]),
2002                files: vec![
2003                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2004                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
2005                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
2006                ],
2007                sort: vec![col("value").sort(false, true)],
2008                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
2009            },
2010            TestCase {
2011                name: "nullable sort columns, nulls last",
2012                file_schema: Schema::new(vec![Field::new(
2013                    "value".to_string(),
2014                    DataType::Float64,
2015                    true,
2016                )]),
2017                files: vec![
2018                    File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
2019                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
2020                    File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
2021                ],
2022                sort: vec![col("value").sort(true, false)],
2023                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
2024            },
2025            TestCase {
2026                name: "nullable sort columns, nulls first",
2027                file_schema: Schema::new(vec![Field::new(
2028                    "value".to_string(),
2029                    DataType::Float64,
2030                    true,
2031                )]),
2032                files: vec![
2033                    File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
2034                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
2035                    File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
2036                ],
2037                sort: vec![col("value").sort(true, true)],
2038                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
2039            },
2040            TestCase {
2041                name: "all three non-overlapping",
2042                file_schema: Schema::new(vec![Field::new(
2043                    "value".to_string(),
2044                    DataType::Float64,
2045                    false,
2046                )]),
2047                files: vec![
2048                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2049                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
2050                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
2051                ],
2052                sort: vec![col("value").sort(true, false)],
2053                expected_result: Ok(vec![vec!["0", "1", "2"]]),
2054            },
2055            TestCase {
2056                name: "all three overlapping",
2057                file_schema: Schema::new(vec![Field::new(
2058                    "value".to_string(),
2059                    DataType::Float64,
2060                    false,
2061                )]),
2062                files: vec![
2063                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2064                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2065                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
2066                ],
2067                sort: vec![col("value").sort(true, false)],
2068                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
2069            },
2070            TestCase {
2071                name: "empty input",
2072                file_schema: Schema::new(vec![Field::new(
2073                    "value".to_string(),
2074                    DataType::Float64,
2075                    false,
2076                )]),
2077                files: vec![],
2078                sort: vec![col("value").sort(true, false)],
2079                expected_result: Ok(vec![]),
2080            },
2081            TestCase {
2082                name: "one file missing statistics",
2083                file_schema: Schema::new(vec![Field::new(
2084                    "value".to_string(),
2085                    DataType::Float64,
2086                    false,
2087                )]),
2088                files: vec![
2089                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2090                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2091                    File::new("2", "2023-01-02", vec![None]),
2092                ],
2093                sort: vec![col("value").sort(true, false)],
2094                expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
2095            },
2096        ];
2097
2098        for case in cases {
2099            let table_schema = Arc::new(Schema::new(
2100                case.file_schema
2101                    .fields()
2102                    .clone()
2103                    .into_iter()
2104                    .cloned()
2105                    .chain(Some(Arc::new(Field::new(
2106                        "date".to_string(),
2107                        DataType::Utf8,
2108                        false,
2109                    ))))
2110                    .collect::<Vec<_>>(),
2111            ));
2112            let Some(sort_order) = LexOrdering::new(
2113                case.sort
2114                    .into_iter()
2115                    .map(|expr| {
2116                        create_physical_sort_expr(
2117                            &expr,
2118                            &DFSchema::try_from(table_schema.as_ref().clone())?,
2119                            &ExecutionProps::default(),
2120                        )
2121                    })
2122                    .collect::<Result<Vec<_>>>()?,
2123            ) else {
2124                return internal_err!("This test should always use an ordering");
2125            };
2126
2127            let partitioned_files = FileGroup::new(
2128                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2129            );
2130            let result = FileScanConfig::split_groups_by_statistics(
2131                &table_schema,
2132                &[partitioned_files.clone()],
2133                &sort_order,
2134            );
2135            let results_by_name = result
2136                .as_ref()
2137                .map(|file_groups| {
2138                    file_groups
2139                        .iter()
2140                        .map(|file_group| {
2141                            file_group
2142                                .iter()
2143                                .map(|file| {
2144                                    partitioned_files
2145                                        .iter()
2146                                        .find_map(|f| {
2147                                            if f.object_meta == file.object_meta {
2148                                                Some(
2149                                                    f.object_meta
2150                                                        .location
2151                                                        .as_ref()
2152                                                        .rsplit('/')
2153                                                        .next()
2154                                                        .unwrap()
2155                                                        .trim_end_matches(".parquet"),
2156                                                )
2157                                            } else {
2158                                                None
2159                                            }
2160                                        })
2161                                        .unwrap()
2162                                })
2163                                .collect::<Vec<_>>()
2164                        })
2165                        .collect::<Vec<_>>()
2166                })
2167                .map_err(|e| e.strip_backtrace().leak() as &'static str);
2168
2169            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2170        }
2171
2172        return Ok(());
2173
2174        impl From<File> for PartitionedFile {
2175            fn from(file: File) -> Self {
2176                PartitionedFile {
2177                    object_meta: ObjectMeta {
2178                        location: Path::from(format!(
2179                            "data/date={}/{}.parquet",
2180                            file.date, file.name
2181                        )),
2182                        last_modified: chrono::Utc.timestamp_nanos(0),
2183                        size: 0,
2184                        e_tag: None,
2185                        version: None,
2186                    },
2187                    partition_values: vec![ScalarValue::from(file.date)],
2188                    range: None,
2189                    statistics: Some(Arc::new(Statistics {
2190                        num_rows: Precision::Absent,
2191                        total_byte_size: Precision::Absent,
2192                        column_statistics: file
2193                            .statistics
2194                            .into_iter()
2195                            .map(|stats| {
2196                                stats
2197                                    .map(|(min, max)| ColumnStatistics {
2198                                        min_value: Precision::Exact(
2199                                            ScalarValue::Float64(min),
2200                                        ),
2201                                        max_value: Precision::Exact(
2202                                            ScalarValue::Float64(max),
2203                                        ),
2204                                        ..Default::default()
2205                                    })
2206                                    .unwrap_or_default()
2207                            })
2208                            .collect::<Vec<_>>(),
2209                    })),
2210                    extensions: None,
2211                    metadata_size_hint: None,
2212                }
2213            }
2214        }
2215    }
2216
2217    // sets default for configs that play no role in projections
2218    fn config_for_projection(
2219        file_schema: SchemaRef,
2220        projection: Option<Vec<usize>>,
2221        statistics: Statistics,
2222        table_partition_cols: Vec<Field>,
2223    ) -> FileScanConfig {
2224        FileScanConfigBuilder::new(
2225            ObjectStoreUrl::parse("test:///").unwrap(),
2226            file_schema,
2227            Arc::new(MockSource::default()),
2228        )
2229        .with_projection(projection)
2230        .with_statistics(statistics)
2231        .with_table_partition_cols(table_partition_cols)
2232        .build()
2233    }
2234
2235    /// Convert partition columns from Vec<String DataType> to Vec<Field>
2236    fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2237        table_partition_cols
2238            .iter()
2239            .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2240            .collect::<Vec<_>>()
2241    }
2242
2243    /// returns record batch with 3 columns of i32 in memory
2244    pub fn build_table_i32(
2245        a: (&str, &Vec<i32>),
2246        b: (&str, &Vec<i32>),
2247        c: (&str, &Vec<i32>),
2248    ) -> RecordBatch {
2249        let schema = Schema::new(vec![
2250            Field::new(a.0, DataType::Int32, false),
2251            Field::new(b.0, DataType::Int32, false),
2252            Field::new(c.0, DataType::Int32, false),
2253        ]);
2254
2255        RecordBatch::try_new(
2256            Arc::new(schema),
2257            vec![
2258                Arc::new(Int32Array::from(a.1.clone())),
2259                Arc::new(Int32Array::from(b.1.clone())),
2260                Arc::new(Int32Array::from(c.1.clone())),
2261            ],
2262        )
2263        .unwrap()
2264    }
2265
2266    #[test]
2267    fn test_file_scan_config_builder() {
2268        let file_schema = aggr_test_schema();
2269        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2270        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2271
2272        // Create a builder with required parameters
2273        let builder = FileScanConfigBuilder::new(
2274            object_store_url.clone(),
2275            Arc::clone(&file_schema),
2276            Arc::clone(&file_source),
2277        );
2278
2279        // Build with various configurations
2280        let config = builder
2281            .with_limit(Some(1000))
2282            .with_projection(Some(vec![0, 1]))
2283            .with_table_partition_cols(vec![Field::new(
2284                "date",
2285                wrap_partition_type_in_dict(DataType::Utf8),
2286                false,
2287            )])
2288            .with_statistics(Statistics::new_unknown(&file_schema))
2289            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2290                "test.parquet".to_string(),
2291                1024,
2292            )])])
2293            .with_output_ordering(vec![[PhysicalSortExpr::new_default(Arc::new(
2294                Column::new("date", 0),
2295            ))]
2296            .into()])
2297            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2298            .with_newlines_in_values(true)
2299            .build();
2300
2301        // Verify the built config has all the expected values
2302        assert_eq!(config.object_store_url, object_store_url);
2303        assert_eq!(config.file_schema, file_schema);
2304        assert_eq!(config.limit, Some(1000));
2305        assert_eq!(config.projection, Some(vec![0, 1]));
2306        assert_eq!(config.table_partition_cols.len(), 1);
2307        assert_eq!(config.table_partition_cols[0].name(), "date");
2308        assert_eq!(config.file_groups.len(), 1);
2309        assert_eq!(config.file_groups[0].len(), 1);
2310        assert_eq!(
2311            config.file_groups[0][0].object_meta.location.as_ref(),
2312            "test.parquet"
2313        );
2314        assert_eq!(
2315            config.file_compression_type,
2316            FileCompressionType::UNCOMPRESSED
2317        );
2318        assert!(config.new_lines_in_values);
2319        assert_eq!(config.output_ordering.len(), 1);
2320    }
2321
2322    #[test]
2323    fn test_file_scan_config_builder_defaults() {
2324        let file_schema = aggr_test_schema();
2325        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2326        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2327
2328        // Create a builder with only required parameters and build without any additional configurations
2329        let config = FileScanConfigBuilder::new(
2330            object_store_url.clone(),
2331            Arc::clone(&file_schema),
2332            Arc::clone(&file_source),
2333        )
2334        .build();
2335
2336        // Verify default values
2337        assert_eq!(config.object_store_url, object_store_url);
2338        assert_eq!(config.file_schema, file_schema);
2339        assert_eq!(config.limit, None);
2340        assert_eq!(config.projection, None);
2341        assert!(config.table_partition_cols.is_empty());
2342        assert!(config.file_groups.is_empty());
2343        assert_eq!(
2344            config.file_compression_type,
2345            FileCompressionType::UNCOMPRESSED
2346        );
2347        assert!(!config.new_lines_in_values);
2348        assert!(config.output_ordering.is_empty());
2349        assert!(config.constraints.is_empty());
2350
2351        // Verify statistics are set to unknown
2352        assert_eq!(
2353            config.file_source.statistics().unwrap().num_rows,
2354            Precision::Absent
2355        );
2356        assert_eq!(
2357            config.file_source.statistics().unwrap().total_byte_size,
2358            Precision::Absent
2359        );
2360        assert_eq!(
2361            config
2362                .file_source
2363                .statistics()
2364                .unwrap()
2365                .column_statistics
2366                .len(),
2367            file_schema.fields().len()
2368        );
2369        for stat in config.file_source.statistics().unwrap().column_statistics {
2370            assert_eq!(stat.distinct_count, Precision::Absent);
2371            assert_eq!(stat.min_value, Precision::Absent);
2372            assert_eq!(stat.max_value, Precision::Absent);
2373            assert_eq!(stat.null_count, Precision::Absent);
2374        }
2375    }
2376
2377    #[test]
2378    fn test_file_scan_config_builder_new_from() {
2379        let schema = aggr_test_schema();
2380        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2381        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2382        let partition_cols = vec![Field::new(
2383            "date",
2384            wrap_partition_type_in_dict(DataType::Utf8),
2385            false,
2386        )];
2387        let file = PartitionedFile::new("test_file.parquet", 100);
2388
2389        // Create a config with non-default values
2390        let original_config = FileScanConfigBuilder::new(
2391            object_store_url.clone(),
2392            Arc::clone(&schema),
2393            Arc::clone(&file_source),
2394        )
2395        .with_projection(Some(vec![0, 2]))
2396        .with_limit(Some(10))
2397        .with_table_partition_cols(partition_cols.clone())
2398        .with_file(file.clone())
2399        .with_constraints(Constraints::default())
2400        .with_newlines_in_values(true)
2401        .build();
2402
2403        // Create a new builder from the config
2404        let new_builder = FileScanConfigBuilder::from(original_config);
2405
2406        // Build a new config from this builder
2407        let new_config = new_builder.build();
2408
2409        // Verify properties match
2410        let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2411        assert_eq!(new_config.object_store_url, object_store_url);
2412        assert_eq!(new_config.file_schema, schema);
2413        assert_eq!(new_config.projection, Some(vec![0, 2]));
2414        assert_eq!(new_config.limit, Some(10));
2415        assert_eq!(new_config.table_partition_cols, partition_cols);
2416        assert_eq!(new_config.file_groups.len(), 1);
2417        assert_eq!(new_config.file_groups[0].len(), 1);
2418        assert_eq!(
2419            new_config.file_groups[0][0].object_meta.location.as_ref(),
2420            "test_file.parquet"
2421        );
2422        assert_eq!(new_config.constraints, Constraints::default());
2423        assert!(new_config.new_lines_in_values);
2424    }
2425
2426    #[test]
2427    fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2428        use datafusion_common::DFSchema;
2429        use datafusion_expr::{col, execution_props::ExecutionProps};
2430
2431        let schema = Arc::new(Schema::new(vec![Field::new(
2432            "value",
2433            DataType::Float64,
2434            false,
2435        )]));
2436
2437        // Setup sort expression
2438        let exec_props = ExecutionProps::new();
2439        let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2440        let sort_expr = [col("value").sort(true, false)];
2441        let sort_ordering = sort_expr
2442            .map(|expr| {
2443                create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2444            })
2445            .into();
2446
2447        // Test case parameters
2448        struct TestCase {
2449            name: String,
2450            file_count: usize,
2451            overlap_factor: f64,
2452            target_partitions: usize,
2453            expected_partition_count: usize,
2454        }
2455
2456        let test_cases = vec![
2457            // Basic cases
2458            TestCase {
2459                name: "no_overlap_10_files_4_partitions".to_string(),
2460                file_count: 10,
2461                overlap_factor: 0.0,
2462                target_partitions: 4,
2463                expected_partition_count: 4,
2464            },
2465            TestCase {
2466                name: "medium_overlap_20_files_5_partitions".to_string(),
2467                file_count: 20,
2468                overlap_factor: 0.5,
2469                target_partitions: 5,
2470                expected_partition_count: 5,
2471            },
2472            TestCase {
2473                name: "high_overlap_30_files_3_partitions".to_string(),
2474                file_count: 30,
2475                overlap_factor: 0.8,
2476                target_partitions: 3,
2477                expected_partition_count: 7,
2478            },
2479            // Edge cases
2480            TestCase {
2481                name: "fewer_files_than_partitions".to_string(),
2482                file_count: 3,
2483                overlap_factor: 0.0,
2484                target_partitions: 10,
2485                expected_partition_count: 3, // Should only create as many partitions as files
2486            },
2487            TestCase {
2488                name: "single_file".to_string(),
2489                file_count: 1,
2490                overlap_factor: 0.0,
2491                target_partitions: 5,
2492                expected_partition_count: 1, // Should create only one partition
2493            },
2494            TestCase {
2495                name: "empty_files".to_string(),
2496                file_count: 0,
2497                overlap_factor: 0.0,
2498                target_partitions: 3,
2499                expected_partition_count: 0, // Empty result for empty input
2500            },
2501        ];
2502
2503        for case in test_cases {
2504            println!("Running test case: {}", case.name);
2505
2506            // Generate files using bench utility function
2507            let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2508
2509            // Call the function under test
2510            let result =
2511                FileScanConfig::split_groups_by_statistics_with_target_partitions(
2512                    &schema,
2513                    &file_groups,
2514                    &sort_ordering,
2515                    case.target_partitions,
2516                )?;
2517
2518            // Verify results
2519            println!(
2520                "Created {} partitions (target was {})",
2521                result.len(),
2522                case.target_partitions
2523            );
2524
2525            // Check partition count
2526            assert_eq!(
2527                result.len(),
2528                case.expected_partition_count,
2529                "Case '{}': Unexpected partition count",
2530                case.name
2531            );
2532
2533            // Verify sort integrity
2534            assert!(
2535                verify_sort_integrity(&result),
2536                "Case '{}': Files within partitions are not properly ordered",
2537                case.name
2538            );
2539
2540            // Distribution check for partitions
2541            if case.file_count > 1 && case.expected_partition_count > 1 {
2542                let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2543                let max_size = *group_sizes.iter().max().unwrap();
2544                let min_size = *group_sizes.iter().min().unwrap();
2545
2546                // Check partition balancing - difference shouldn't be extreme
2547                let avg_files_per_partition =
2548                    case.file_count as f64 / case.expected_partition_count as f64;
2549                assert!(
2550                    (max_size as f64) < 2.0 * avg_files_per_partition,
2551                    "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2552                    case.name,
2553                    max_size,
2554                    avg_files_per_partition
2555                );
2556
2557                println!("Distribution - min files: {min_size}, max files: {max_size}");
2558            }
2559        }
2560
2561        // Test error case: zero target partitions
2562        let empty_groups: Vec<FileGroup> = vec![];
2563        let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2564            &schema,
2565            &empty_groups,
2566            &sort_ordering,
2567            0,
2568        )
2569        .unwrap_err();
2570
2571        assert!(
2572            err.to_string()
2573                .contains("target_partitions must be greater than 0"),
2574            "Expected error for zero target partitions"
2575        );
2576
2577        Ok(())
2578    }
2579}