datafusion_datasource/
file_scan_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21use std::{
22    any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23    fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use arrow::{
27    array::{
28        ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
29        RecordBatchOptions,
30    },
31    buffer::Buffer,
32    datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
33};
34use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics};
35use datafusion_common::{DataFusionError, ScalarValue};
36use datafusion_execution::{
37    object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
38};
39use datafusion_physical_expr::{
40    expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
41    PhysicalSortExpr,
42};
43use datafusion_physical_plan::{
44    display::{display_orderings, ProjectSchemaDisplay},
45    metrics::ExecutionPlanMetricsSet,
46    projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
47    DisplayAs, DisplayFormatType, ExecutionPlan,
48};
49use log::{debug, warn};
50
51use crate::file_groups::FileGroup;
52use crate::{
53    display::FileGroupsDisplay,
54    file::FileSource,
55    file_compression_type::FileCompressionType,
56    file_stream::FileStream,
57    source::{DataSource, DataSourceExec},
58    statistics::MinMaxStatistics,
59    PartitionedFile,
60};
61
62/// The base configurations for a [`DataSourceExec`], the a physical plan for
63/// any given file format.
64///
65/// Use [`Self::build`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
66///
67/// # Example
68/// ```
69/// # use std::any::Any;
70/// # use std::sync::Arc;
71/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
72/// # use object_store::ObjectStore;
73/// # use datafusion_common::Statistics;
74/// # use datafusion_datasource::file::FileSource;
75/// # use datafusion_datasource::file_groups::FileGroup;
76/// # use datafusion_datasource::PartitionedFile;
77/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
78/// # use datafusion_datasource::file_stream::FileOpener;
79/// # use datafusion_datasource::source::DataSourceExec;
80/// # use datafusion_execution::object_store::ObjectStoreUrl;
81/// # use datafusion_physical_plan::ExecutionPlan;
82/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
83/// # let file_schema = Arc::new(Schema::new(vec![
84/// #  Field::new("c1", DataType::Int32, false),
85/// #  Field::new("c2", DataType::Int32, false),
86/// #  Field::new("c3", DataType::Int32, false),
87/// #  Field::new("c4", DataType::Int32, false),
88/// # ]));
89/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
90/// # struct ParquetSource {
91/// #    projected_statistics: Option<Statistics>
92/// # };
93/// # impl FileSource for ParquetSource {
94/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
95/// #  fn as_any(&self) -> &dyn Any { self  }
96/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
97/// #  fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { unimplemented!() }
98/// #  fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
99/// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics)} ) }
100/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
101/// #  fn statistics(&self) -> datafusion_common::Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) }
102/// #  fn file_type(&self) -> &str { "parquet" }
103/// #  }
104/// # impl ParquetSource {
105/// #  fn new() -> Self { Self {projected_statistics: None} }
106/// # }
107/// // create FileScan config for reading parquet files from file://
108/// let object_store_url = ObjectStoreUrl::local_filesystem();
109/// let file_source = Arc::new(ParquetSource::new());
110/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
111///   .with_limit(Some(1000))            // read only the first 1000 records
112///   .with_projection(Some(vec![2, 3])) // project columns 2 and 3
113///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
114///   .with_file(PartitionedFile::new("file1.parquet", 1234))
115///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
116///   // in a  single row group
117///   .with_file_group(FileGroup::new(vec![
118///    PartitionedFile::new("file2.parquet", 56),
119///    PartitionedFile::new("file3.parquet", 78),
120///   ])).build();
121/// // create an execution plan from the config
122/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
123/// ```
124#[derive(Clone)]
125pub struct FileScanConfig {
126    /// Object store URL, used to get an [`ObjectStore`] instance from
127    /// [`RuntimeEnv::object_store`]
128    ///
129    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
130    /// as `file://` or `s3://my_bucket`. It should not include the path to the
131    /// file itself. The relevant URL prefix must be registered via
132    /// [`RuntimeEnv::register_object_store`]
133    ///
134    /// [`ObjectStore`]: object_store::ObjectStore
135    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
136    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
137    pub object_store_url: ObjectStoreUrl,
138    /// Schema before `projection` is applied. It contains the all columns that may
139    /// appear in the files. It does not include table partition columns
140    /// that may be added.
141    /// Note that this is **not** the schema of the physical files.
142    /// This is the schema that the physical file schema will be
143    /// mapped onto, and the schema that the [`DataSourceExec`] will return.
144    pub file_schema: SchemaRef,
145    /// List of files to be processed, grouped into partitions
146    ///
147    /// Each file must have a schema of `file_schema` or a subset. If
148    /// a particular file has a subset, the missing columns are
149    /// padded with NULLs.
150    ///
151    /// DataFusion may attempt to read each partition of files
152    /// concurrently, however files *within* a partition will be read
153    /// sequentially, one after the next.
154    pub file_groups: Vec<FileGroup>,
155    /// Table constraints
156    pub constraints: Constraints,
157    /// Columns on which to project the data. Indexes that are higher than the
158    /// number of columns of `file_schema` refer to `table_partition_cols`.
159    pub projection: Option<Vec<usize>>,
160    /// The maximum number of records to read from this plan. If `None`,
161    /// all records after filtering are returned.
162    pub limit: Option<usize>,
163    /// The partitioning columns
164    pub table_partition_cols: Vec<Field>,
165    /// All equivalent lexicographical orderings that describe the schema.
166    pub output_ordering: Vec<LexOrdering>,
167    /// File compression type
168    pub file_compression_type: FileCompressionType,
169    /// Are new lines in values supported for CSVOptions
170    pub new_lines_in_values: bool,
171    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
172    pub file_source: Arc<dyn FileSource>,
173    /// Batch size while creating new batches
174    /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
175    pub batch_size: Option<usize>,
176}
177
178/// A builder for [`FileScanConfig`]'s.
179///
180/// Example:
181///
182/// ```rust
183/// # use std::sync::Arc;
184/// # use arrow::datatypes::{DataType, Field, Schema};
185/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig};
186/// # use datafusion_datasource::file_compression_type::FileCompressionType;
187/// # use datafusion_datasource::file_groups::FileGroup;
188/// # use datafusion_datasource::PartitionedFile;
189/// # use datafusion_execution::object_store::ObjectStoreUrl;
190/// # use datafusion_common::Statistics;
191/// # use datafusion_datasource::file::FileSource;
192///
193/// # fn main() {
194/// # fn with_source(file_source: Arc<dyn FileSource>) {
195///     // Create a schema for our Parquet files
196///     let schema = Arc::new(Schema::new(vec![
197///         Field::new("id", DataType::Int32, false),
198///         Field::new("value", DataType::Utf8, false),
199///     ]));
200///
201///     // Create a builder for scanning Parquet files from a local filesystem
202///     let config = FileScanConfigBuilder::new(
203///         ObjectStoreUrl::local_filesystem(),
204///         schema,
205///         file_source,
206///     )
207///     // Set a limit of 1000 rows
208///     .with_limit(Some(1000))
209///     // Project only the first column
210///     .with_projection(Some(vec![0]))
211///     // Add partition columns
212///     .with_table_partition_cols(vec![
213///         Field::new("date", DataType::Utf8, false),
214///     ])
215///     // Add a file group with two files
216///     .with_file_group(FileGroup::new(vec![
217///         PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024),
218///         PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048),
219///     ]))
220///     // Set compression type
221///     .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
222///     // Build the final config
223///     .build();
224/// # }
225/// # }
226/// ```
227#[derive(Clone)]
228pub struct FileScanConfigBuilder {
229    object_store_url: ObjectStoreUrl,
230    /// Table schema before any projections or partition columns are applied.
231    /// This schema is used to read the files, but is **not** necessarily the schema of the physical files.
232    /// Rather this is the schema that the physical file schema will be mapped onto, and the schema that the
233    /// [`DataSourceExec`] will return.
234    file_schema: SchemaRef,
235    file_source: Arc<dyn FileSource>,
236
237    limit: Option<usize>,
238    projection: Option<Vec<usize>>,
239    table_partition_cols: Vec<Field>,
240    constraints: Option<Constraints>,
241    file_groups: Vec<FileGroup>,
242    statistics: Option<Statistics>,
243    output_ordering: Vec<LexOrdering>,
244    file_compression_type: Option<FileCompressionType>,
245    new_lines_in_values: Option<bool>,
246    batch_size: Option<usize>,
247}
248
249impl FileScanConfigBuilder {
250    /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files.
251    ///
252    /// # Parameters:
253    /// * `object_store_url`: See [`FileScanConfig::object_store_url`]
254    /// * `file_schema`: See [`FileScanConfig::file_schema`]
255    /// * `file_source`: See [`FileScanConfig::file_source`]
256    pub fn new(
257        object_store_url: ObjectStoreUrl,
258        file_schema: SchemaRef,
259        file_source: Arc<dyn FileSource>,
260    ) -> Self {
261        Self {
262            object_store_url,
263            file_schema,
264            file_source,
265            file_groups: vec![],
266            statistics: None,
267            output_ordering: vec![],
268            file_compression_type: None,
269            new_lines_in_values: None,
270            limit: None,
271            projection: None,
272            table_partition_cols: vec![],
273            constraints: None,
274            batch_size: None,
275        }
276    }
277
278    /// Set the maximum number of records to read from this plan. If `None`,
279    /// all records after filtering are returned.
280    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
281        self.limit = limit;
282        self
283    }
284
285    /// Set the file source for scanning files.
286    ///
287    /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
288    /// after the builder has been created.
289    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
290        self.file_source = file_source;
291        self
292    }
293
294    /// Set the columns on which to project the data. Indexes that are higher than the
295    /// number of columns of `file_schema` refer to `table_partition_cols`.
296    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
297        self.projection = projection;
298        self
299    }
300
301    /// Set the partitioning columns
302    pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
303        self.table_partition_cols = table_partition_cols;
304        self
305    }
306
307    /// Set the table constraints
308    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
309        self.constraints = Some(constraints);
310        self
311    }
312
313    /// Set the estimated overall statistics of the files, taking `filters` into account.
314    /// Defaults to [`Statistics::new_unknown`].
315    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
316        self.statistics = Some(statistics);
317        self
318    }
319
320    /// Set the list of files to be processed, grouped into partitions.
321    ///
322    /// Each file must have a schema of `file_schema` or a subset. If
323    /// a particular file has a subset, the missing columns are
324    /// padded with NULLs.
325    ///
326    /// DataFusion may attempt to read each partition of files
327    /// concurrently, however files *within* a partition will be read
328    /// sequentially, one after the next.
329    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
330        self.file_groups = file_groups;
331        self
332    }
333
334    /// Add a new file group
335    ///
336    /// See [`Self::with_file_groups`] for more information
337    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
338        self.file_groups.push(file_group);
339        self
340    }
341
342    /// Add a file as a single group
343    ///
344    /// See [`Self::with_file_groups`] for more information.
345    pub fn with_file(self, file: PartitionedFile) -> Self {
346        self.with_file_group(FileGroup::new(vec![file]))
347    }
348
349    /// Set the output ordering of the files
350    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
351        self.output_ordering = output_ordering;
352        self
353    }
354
355    /// Set the file compression type
356    pub fn with_file_compression_type(
357        mut self,
358        file_compression_type: FileCompressionType,
359    ) -> Self {
360        self.file_compression_type = Some(file_compression_type);
361        self
362    }
363
364    /// Set whether new lines in values are supported for CSVOptions
365    ///
366    /// Parsing newlines in quoted values may be affected by execution behaviour such as
367    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
368    /// parsed successfully, which may reduce performance.
369    pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
370        self.new_lines_in_values = Some(new_lines_in_values);
371        self
372    }
373
374    /// Set the batch_size property
375    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
376        self.batch_size = batch_size;
377        self
378    }
379
380    /// Build the final [`FileScanConfig`] with all the configured settings.
381    ///
382    /// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
383    /// Any unset optional fields will use their default values.
384    pub fn build(self) -> FileScanConfig {
385        let Self {
386            object_store_url,
387            file_schema,
388            file_source,
389            limit,
390            projection,
391            table_partition_cols,
392            constraints,
393            file_groups,
394            statistics,
395            output_ordering,
396            file_compression_type,
397            new_lines_in_values,
398            batch_size,
399        } = self;
400
401        let constraints = constraints.unwrap_or_default();
402        let statistics =
403            statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
404
405        let file_source = file_source.with_statistics(statistics.clone());
406        let file_compression_type =
407            file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
408        let new_lines_in_values = new_lines_in_values.unwrap_or(false);
409
410        FileScanConfig {
411            object_store_url,
412            file_schema,
413            file_source,
414            limit,
415            projection,
416            table_partition_cols,
417            constraints,
418            file_groups,
419            output_ordering,
420            file_compression_type,
421            new_lines_in_values,
422            batch_size,
423        }
424    }
425}
426
427impl From<FileScanConfig> for FileScanConfigBuilder {
428    fn from(config: FileScanConfig) -> Self {
429        Self {
430            object_store_url: config.object_store_url,
431            file_schema: config.file_schema,
432            file_source: Arc::<dyn FileSource>::clone(&config.file_source),
433            file_groups: config.file_groups,
434            statistics: config.file_source.statistics().ok(),
435            output_ordering: config.output_ordering,
436            file_compression_type: Some(config.file_compression_type),
437            new_lines_in_values: Some(config.new_lines_in_values),
438            limit: config.limit,
439            projection: config.projection,
440            table_partition_cols: config.table_partition_cols,
441            constraints: Some(config.constraints),
442            batch_size: config.batch_size,
443        }
444    }
445}
446
447impl DataSource for FileScanConfig {
448    fn open(
449        &self,
450        partition: usize,
451        context: Arc<TaskContext>,
452    ) -> Result<SendableRecordBatchStream> {
453        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
454        let batch_size = self
455            .batch_size
456            .unwrap_or_else(|| context.session_config().batch_size());
457
458        let source = self
459            .file_source
460            .with_batch_size(batch_size)
461            .with_schema(Arc::clone(&self.file_schema))
462            .with_projection(self);
463
464        let opener = source.create_file_opener(object_store, self, partition);
465
466        let stream = FileStream::new(self, partition, opener, source.metrics())?;
467        Ok(Box::pin(stream))
468    }
469
470    fn as_any(&self) -> &dyn Any {
471        self
472    }
473
474    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
475        match t {
476            DisplayFormatType::Default | DisplayFormatType::Verbose => {
477                let (schema, _, _, orderings) = self.project();
478
479                write!(f, "file_groups=")?;
480                FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
481
482                if !schema.fields().is_empty() {
483                    write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
484                }
485
486                if let Some(limit) = self.limit {
487                    write!(f, ", limit={limit}")?;
488                }
489
490                display_orderings(f, &orderings)?;
491
492                if !self.constraints.is_empty() {
493                    write!(f, ", {}", self.constraints)?;
494                }
495
496                self.fmt_file_source(t, f)
497            }
498            DisplayFormatType::TreeRender => {
499                writeln!(f, "format={}", self.file_source.file_type())?;
500                self.file_source.fmt_extra(t, f)?;
501                let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
502                writeln!(f, "files={num_files}")?;
503                Ok(())
504            }
505        }
506    }
507
508    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
509    fn repartitioned(
510        &self,
511        target_partitions: usize,
512        repartition_file_min_size: usize,
513        output_ordering: Option<LexOrdering>,
514    ) -> Result<Option<Arc<dyn DataSource>>> {
515        let source = self.file_source.repartitioned(
516            target_partitions,
517            repartition_file_min_size,
518            output_ordering,
519            self,
520        )?;
521
522        Ok(source.map(|s| Arc::new(s) as _))
523    }
524
525    fn output_partitioning(&self) -> Partitioning {
526        Partitioning::UnknownPartitioning(self.file_groups.len())
527    }
528
529    fn eq_properties(&self) -> EquivalenceProperties {
530        let (schema, constraints, _, orderings) = self.project();
531        EquivalenceProperties::new_with_orderings(schema, orderings.as_slice())
532            .with_constraints(constraints)
533    }
534
535    fn statistics(&self) -> Result<Statistics> {
536        Ok(self.projected_stats())
537    }
538
539    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
540        let source = FileScanConfigBuilder::from(self.clone())
541            .with_limit(limit)
542            .build();
543        Some(Arc::new(source))
544    }
545
546    fn fetch(&self) -> Option<usize> {
547        self.limit
548    }
549
550    fn metrics(&self) -> ExecutionPlanMetricsSet {
551        self.file_source.metrics().clone()
552    }
553
554    fn try_swapping_with_projection(
555        &self,
556        projection: &ProjectionExec,
557    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
558        // This process can be moved into CsvExec, but it would be an overlap of their responsibility.
559
560        // Must be all column references, with no table partition columns (which can not be projected)
561        let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
562            expr.as_any()
563                .downcast_ref::<Column>()
564                .map(|expr| expr.index() >= self.file_schema.fields().len())
565                .unwrap_or(false)
566        });
567
568        // If there is any non-column or alias-carrier expression, Projection should not be removed.
569        let no_aliases = all_alias_free_columns(projection.expr());
570
571        Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
572            let file_scan = self.clone();
573            let source = Arc::clone(&file_scan.file_source);
574            let new_projections = new_projections_for_columns(
575                projection,
576                &file_scan
577                    .projection
578                    .clone()
579                    .unwrap_or((0..self.file_schema.fields().len()).collect()),
580            );
581            DataSourceExec::from_data_source(
582                FileScanConfigBuilder::from(file_scan)
583                    // Assign projected statistics to source
584                    .with_projection(Some(new_projections))
585                    .with_source(source)
586                    .build(),
587            ) as _
588        }))
589    }
590}
591
592impl FileScanConfig {
593    /// Create a new [`FileScanConfig`] with default settings for scanning files.
594    ///
595    /// See example on [`FileScanConfig`]
596    ///
597    /// No file groups are added by default. See [`Self::with_file`], [`Self::with_file_group`] and
598    /// [`Self::with_file_groups`].
599    ///
600    /// # Parameters:
601    /// * `object_store_url`: See [`Self::object_store_url`]
602    /// * `file_schema`: See [`Self::file_schema`]
603    #[allow(deprecated)] // `new` will be removed same time as `with_source`
604    pub fn new(
605        object_store_url: ObjectStoreUrl,
606        file_schema: SchemaRef,
607        file_source: Arc<dyn FileSource>,
608    ) -> Self {
609        let statistics = Statistics::new_unknown(&file_schema);
610        let file_source = file_source.with_statistics(statistics.clone());
611        Self {
612            object_store_url,
613            file_schema,
614            file_groups: vec![],
615            constraints: Constraints::empty(),
616            projection: None,
617            limit: None,
618            table_partition_cols: vec![],
619            output_ordering: vec![],
620            file_compression_type: FileCompressionType::UNCOMPRESSED,
621            new_lines_in_values: false,
622            file_source: Arc::clone(&file_source),
623            batch_size: None,
624        }
625    }
626
627    /// Set the file source
628    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
629    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
630        self.file_source =
631            file_source.with_statistics(Statistics::new_unknown(&self.file_schema));
632        self
633    }
634
635    /// Set the table constraints of the files
636    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
637    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
638        self.constraints = constraints;
639        self
640    }
641
642    /// Set the statistics of the files
643    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
644    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
645        self.file_source = self.file_source.with_statistics(statistics);
646        self
647    }
648
649    fn projection_indices(&self) -> Vec<usize> {
650        match &self.projection {
651            Some(proj) => proj.clone(),
652            None => (0..self.file_schema.fields().len()
653                + self.table_partition_cols.len())
654                .collect(),
655        }
656    }
657
658    pub fn projected_stats(&self) -> Statistics {
659        let statistics = self.file_source.statistics().unwrap();
660
661        let table_cols_stats = self
662            .projection_indices()
663            .into_iter()
664            .map(|idx| {
665                if idx < self.file_schema.fields().len() {
666                    statistics.column_statistics[idx].clone()
667                } else {
668                    // TODO provide accurate stat for partition column (#1186)
669                    ColumnStatistics::new_unknown()
670                }
671            })
672            .collect();
673
674        Statistics {
675            num_rows: statistics.num_rows,
676            // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
677            total_byte_size: statistics.total_byte_size,
678            column_statistics: table_cols_stats,
679        }
680    }
681
682    pub fn projected_schema(&self) -> Arc<Schema> {
683        let table_fields: Vec<_> = self
684            .projection_indices()
685            .into_iter()
686            .map(|idx| {
687                if idx < self.file_schema.fields().len() {
688                    self.file_schema.field(idx).clone()
689                } else {
690                    let partition_idx = idx - self.file_schema.fields().len();
691                    self.table_partition_cols[partition_idx].clone()
692                }
693            })
694            .collect();
695
696        Arc::new(Schema::new_with_metadata(
697            table_fields,
698            self.file_schema.metadata().clone(),
699        ))
700    }
701
702    pub fn projected_constraints(&self) -> Constraints {
703        let indexes = self.projection_indices();
704
705        self.constraints
706            .project(&indexes)
707            .unwrap_or_else(Constraints::empty)
708    }
709
710    /// Set the projection of the files
711    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
712    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
713        self.projection = projection;
714        self
715    }
716
717    /// Set the limit of the files
718    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
719    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
720        self.limit = limit;
721        self
722    }
723
724    /// Add a file as a single group
725    ///
726    /// See [Self::file_groups] for more information.
727    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
728    #[allow(deprecated)]
729    pub fn with_file(self, file: PartitionedFile) -> Self {
730        self.with_file_group(FileGroup::new(vec![file]))
731    }
732
733    /// Add the file groups
734    ///
735    /// See [Self::file_groups] for more information.
736    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
737    pub fn with_file_groups(mut self, mut file_groups: Vec<FileGroup>) -> Self {
738        self.file_groups.append(&mut file_groups);
739        self
740    }
741
742    /// Add a new file group
743    ///
744    /// See [Self::file_groups] for more information
745    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
746    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
747        self.file_groups.push(file_group);
748        self
749    }
750
751    /// Set the partitioning columns of the files
752    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
753    pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
754        self.table_partition_cols = table_partition_cols;
755        self
756    }
757
758    /// Set the output ordering of the files
759    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
760    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
761        self.output_ordering = output_ordering;
762        self
763    }
764
765    /// Set the file compression type
766    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
767    pub fn with_file_compression_type(
768        mut self,
769        file_compression_type: FileCompressionType,
770    ) -> Self {
771        self.file_compression_type = file_compression_type;
772        self
773    }
774
775    /// Set the new_lines_in_values property
776    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
777    pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
778        self.new_lines_in_values = new_lines_in_values;
779        self
780    }
781
782    /// Set the batch_size property
783    #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
784    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
785        self.batch_size = batch_size;
786        self
787    }
788
789    /// Specifies whether newlines in (quoted) values are supported.
790    ///
791    /// Parsing newlines in quoted values may be affected by execution behaviour such as
792    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
793    /// parsed successfully, which may reduce performance.
794    ///
795    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
796    pub fn newlines_in_values(&self) -> bool {
797        self.new_lines_in_values
798    }
799
800    /// Project the schema, constraints, and the statistics on the given column indices
801    pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
802        if self.projection.is_none() && self.table_partition_cols.is_empty() {
803            return (
804                Arc::clone(&self.file_schema),
805                self.constraints.clone(),
806                self.file_source.statistics().unwrap().clone(),
807                self.output_ordering.clone(),
808            );
809        }
810
811        let schema = self.projected_schema();
812        let constraints = self.projected_constraints();
813        let stats = self.projected_stats();
814
815        let output_ordering = get_projected_output_ordering(self, &schema);
816
817        (schema, constraints, stats, output_ordering)
818    }
819
820    pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
821        self.projection.as_ref().map(|p| {
822            p.iter()
823                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
824                .map(|col_idx| self.file_schema.field(*col_idx).name())
825                .cloned()
826                .collect()
827        })
828    }
829
830    /// Projects only file schema, ignoring partition columns
831    pub fn projected_file_schema(&self) -> SchemaRef {
832        let fields = self.file_column_projection_indices().map(|indices| {
833            indices
834                .iter()
835                .map(|col_idx| self.file_schema.field(*col_idx))
836                .cloned()
837                .collect::<Vec<_>>()
838        });
839
840        fields.map_or_else(
841            || Arc::clone(&self.file_schema),
842            |f| {
843                Arc::new(Schema::new_with_metadata(
844                    f,
845                    self.file_schema.metadata.clone(),
846                ))
847            },
848        )
849    }
850
851    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
852        self.projection.as_ref().map(|p| {
853            p.iter()
854                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
855                .copied()
856                .collect()
857        })
858    }
859
860    /// Splits file groups into new groups based on statistics to enable efficient parallel processing.
861    ///
862    /// The method distributes files across a target number of partitions while ensuring
863    /// files within each partition maintain sort order based on their min/max statistics.
864    ///
865    /// The algorithm works by:
866    /// 1. Takes files sorted by minimum values
867    /// 2. For each file:
868    ///   - Finds eligible groups (empty or where file's min > group's last max)
869    ///   - Selects the smallest eligible group
870    ///   - Creates a new group if needed
871    ///
872    /// # Parameters
873    /// * `table_schema`: Schema containing information about the columns
874    /// * `file_groups`: The original file groups to split
875    /// * `sort_order`: The lexicographical ordering to maintain within each group
876    /// * `target_partitions`: The desired number of output partitions
877    ///
878    /// # Returns
879    /// A new set of file groups, where files within each group are non-overlapping with respect to
880    /// their min/max statistics and maintain the specified sort order.
881    pub fn split_groups_by_statistics_with_target_partitions(
882        table_schema: &SchemaRef,
883        file_groups: &[FileGroup],
884        sort_order: &LexOrdering,
885        target_partitions: usize,
886    ) -> Result<Vec<FileGroup>> {
887        if target_partitions == 0 {
888            return Err(DataFusionError::Internal(
889                "target_partitions must be greater than 0".to_string(),
890            ));
891        }
892
893        let flattened_files = file_groups
894            .iter()
895            .flat_map(FileGroup::iter)
896            .collect::<Vec<_>>();
897
898        if flattened_files.is_empty() {
899            return Ok(vec![]);
900        }
901
902        let statistics = MinMaxStatistics::new_from_files(
903            sort_order,
904            table_schema,
905            None,
906            flattened_files.iter().copied(),
907        )?;
908
909        let indices_sorted_by_min = statistics.min_values_sorted();
910
911        // Initialize with target_partitions empty groups
912        let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
913
914        for (idx, min) in indices_sorted_by_min {
915            if let Some((_, group)) = file_groups_indices
916                .iter_mut()
917                .enumerate()
918                .filter(|(_, group)| {
919                    group.is_empty()
920                        || min
921                            > statistics
922                                .max(*group.last().expect("groups should not be empty"))
923                })
924                .min_by_key(|(_, group)| group.len())
925            {
926                group.push(idx);
927            } else {
928                // Create a new group if no existing group fits
929                file_groups_indices.push(vec![idx]);
930            }
931        }
932
933        // Remove any empty groups
934        file_groups_indices.retain(|group| !group.is_empty());
935
936        // Assemble indices back into groups of PartitionedFiles
937        Ok(file_groups_indices
938            .into_iter()
939            .map(|file_group_indices| {
940                FileGroup::new(
941                    file_group_indices
942                        .into_iter()
943                        .map(|idx| flattened_files[idx].clone())
944                        .collect(),
945                )
946            })
947            .collect())
948    }
949
950    /// Attempts to do a bin-packing on files into file groups, such that any two files
951    /// in a file group are ordered and non-overlapping with respect to their statistics.
952    /// It will produce the smallest number of file groups possible.
953    pub fn split_groups_by_statistics(
954        table_schema: &SchemaRef,
955        file_groups: &[FileGroup],
956        sort_order: &LexOrdering,
957    ) -> Result<Vec<FileGroup>> {
958        let flattened_files = file_groups
959            .iter()
960            .flat_map(FileGroup::iter)
961            .collect::<Vec<_>>();
962        // First Fit:
963        // * Choose the first file group that a file can be placed into.
964        // * If it fits into no existing file groups, create a new one.
965        //
966        // By sorting files by min values and then applying first-fit bin packing,
967        // we can produce the smallest number of file groups such that
968        // files within a group are in order and non-overlapping.
969        //
970        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
971        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
972
973        if flattened_files.is_empty() {
974            return Ok(vec![]);
975        }
976
977        let statistics = MinMaxStatistics::new_from_files(
978            sort_order,
979            table_schema,
980            None,
981            flattened_files.iter().copied(),
982        )
983        .map_err(|e| {
984            e.context("construct min/max statistics for split_groups_by_statistics")
985        })?;
986
987        let indices_sorted_by_min = statistics.min_values_sorted();
988        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
989
990        for (idx, min) in indices_sorted_by_min {
991            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
992                // If our file is non-overlapping and comes _after_ the last file,
993                // it fits in this file group.
994                min > statistics.max(
995                    *group
996                        .last()
997                        .expect("groups should be nonempty at construction"),
998                )
999            });
1000            match file_group_to_insert {
1001                Some(group) => group.push(idx),
1002                None => file_groups_indices.push(vec![idx]),
1003            }
1004        }
1005
1006        // Assemble indices back into groups of PartitionedFiles
1007        Ok(file_groups_indices
1008            .into_iter()
1009            .map(|file_group_indices| {
1010                file_group_indices
1011                    .into_iter()
1012                    .map(|idx| flattened_files[idx].clone())
1013                    .collect()
1014            })
1015            .collect())
1016    }
1017
1018    /// Returns a new [`DataSourceExec`] to scan the files specified by this config
1019    #[deprecated(since = "47.0.0", note = "use DataSourceExec::new instead")]
1020    pub fn build(self) -> Arc<DataSourceExec> {
1021        DataSourceExec::from_data_source(self)
1022    }
1023
1024    /// Write the data_type based on file_source
1025    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1026        write!(f, ", file_type={}", self.file_source.file_type())?;
1027        self.file_source.fmt_extra(t, f)
1028    }
1029
1030    /// Returns the file_source
1031    pub fn file_source(&self) -> &Arc<dyn FileSource> {
1032        &self.file_source
1033    }
1034}
1035
1036impl Debug for FileScanConfig {
1037    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1038        write!(f, "FileScanConfig {{")?;
1039        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1040
1041        write!(
1042            f,
1043            "statistics={:?}, ",
1044            self.file_source.statistics().unwrap()
1045        )?;
1046
1047        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1048        write!(f, "}}")
1049    }
1050}
1051
1052impl DisplayAs for FileScanConfig {
1053    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1054        let (schema, _, _, orderings) = self.project();
1055
1056        write!(f, "file_groups=")?;
1057        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1058
1059        if !schema.fields().is_empty() {
1060            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1061        }
1062
1063        if let Some(limit) = self.limit {
1064            write!(f, ", limit={limit}")?;
1065        }
1066
1067        display_orderings(f, &orderings)?;
1068
1069        if !self.constraints.is_empty() {
1070            write!(f, ", {}", self.constraints)?;
1071        }
1072
1073        Ok(())
1074    }
1075}
1076
1077/// A helper that projects partition columns into the file record batches.
1078///
1079/// One interesting trick is the usage of a cache for the key buffers of the partition column
1080/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
1081/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
1082/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
1083pub struct PartitionColumnProjector {
1084    /// An Arrow buffer initialized to zeros that represents the key array of all partition
1085    /// columns (partition columns are materialized by dictionary arrays with only one
1086    /// value in the dictionary, thus all the keys are equal to zero).
1087    key_buffer_cache: ZeroBufferGenerators,
1088    /// Mapping between the indexes in the list of partition columns and the target
1089    /// schema. Sorted by index in the target schema so that we can iterate on it to
1090    /// insert the partition columns in the target record batch.
1091    projected_partition_indexes: Vec<(usize, usize)>,
1092    /// The schema of the table once the projection was applied.
1093    projected_schema: SchemaRef,
1094}
1095
1096impl PartitionColumnProjector {
1097    // Create a projector to insert the partitioning columns into batches read from files
1098    // - `projected_schema`: the target schema with both file and partitioning columns
1099    // - `table_partition_cols`: all the partitioning column names
1100    pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
1101        let mut idx_map = HashMap::new();
1102        for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
1103            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
1104                idx_map.insert(partition_idx, schema_idx);
1105            }
1106        }
1107
1108        let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
1109        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
1110
1111        Self {
1112            projected_partition_indexes,
1113            key_buffer_cache: Default::default(),
1114            projected_schema,
1115        }
1116    }
1117
1118    // Transform the batch read from the file by inserting the partitioning columns
1119    // to the right positions as deduced from `projected_schema`
1120    // - `file_batch`: batch read from the file, with internal projection applied
1121    // - `partition_values`: the list of partition values, one for each partition column
1122    pub fn project(
1123        &mut self,
1124        file_batch: RecordBatch,
1125        partition_values: &[ScalarValue],
1126    ) -> Result<RecordBatch> {
1127        let expected_cols =
1128            self.projected_schema.fields().len() - self.projected_partition_indexes.len();
1129
1130        if file_batch.columns().len() != expected_cols {
1131            return exec_err!(
1132                "Unexpected batch schema from file, expected {} cols but got {}",
1133                expected_cols,
1134                file_batch.columns().len()
1135            );
1136        }
1137
1138        let mut cols = file_batch.columns().to_vec();
1139        for &(pidx, sidx) in &self.projected_partition_indexes {
1140            let p_value =
1141                partition_values
1142                    .get(pidx)
1143                    .ok_or(DataFusionError::Execution(
1144                        "Invalid partitioning found on disk".to_string(),
1145                    ))?;
1146
1147            let mut partition_value = Cow::Borrowed(p_value);
1148
1149            // check if user forgot to dict-encode the partition value
1150            let field = self.projected_schema.field(sidx);
1151            let expected_data_type = field.data_type();
1152            let actual_data_type = partition_value.data_type();
1153            if let DataType::Dictionary(key_type, _) = expected_data_type {
1154                if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
1155                    warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
1156                    partition_value = Cow::Owned(ScalarValue::Dictionary(
1157                        key_type.clone(),
1158                        Box::new(partition_value.as_ref().clone()),
1159                    ));
1160                }
1161            }
1162
1163            cols.insert(
1164                sidx,
1165                create_output_array(
1166                    &mut self.key_buffer_cache,
1167                    partition_value.as_ref(),
1168                    file_batch.num_rows(),
1169                )?,
1170            )
1171        }
1172
1173        RecordBatch::try_new_with_options(
1174            Arc::clone(&self.projected_schema),
1175            cols,
1176            &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
1177        )
1178        .map_err(Into::into)
1179    }
1180}
1181
1182#[derive(Debug, Default)]
1183struct ZeroBufferGenerators {
1184    gen_i8: ZeroBufferGenerator<i8>,
1185    gen_i16: ZeroBufferGenerator<i16>,
1186    gen_i32: ZeroBufferGenerator<i32>,
1187    gen_i64: ZeroBufferGenerator<i64>,
1188    gen_u8: ZeroBufferGenerator<u8>,
1189    gen_u16: ZeroBufferGenerator<u16>,
1190    gen_u32: ZeroBufferGenerator<u32>,
1191    gen_u64: ZeroBufferGenerator<u64>,
1192}
1193
1194/// Generate a arrow [`Buffer`] that contains zero values.
1195#[derive(Debug, Default)]
1196struct ZeroBufferGenerator<T>
1197where
1198    T: ArrowNativeType,
1199{
1200    cache: Option<Buffer>,
1201    _t: PhantomData<T>,
1202}
1203
1204impl<T> ZeroBufferGenerator<T>
1205where
1206    T: ArrowNativeType,
1207{
1208    const SIZE: usize = size_of::<T>();
1209
1210    fn get_buffer(&mut self, n_vals: usize) -> Buffer {
1211        match &mut self.cache {
1212            Some(buf) if buf.len() >= n_vals * Self::SIZE => {
1213                buf.slice_with_length(0, n_vals * Self::SIZE)
1214            }
1215            _ => {
1216                let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
1217                key_buffer_builder.advance(n_vals); // keys are all 0
1218                self.cache.insert(key_buffer_builder.finish()).clone()
1219            }
1220        }
1221    }
1222}
1223
1224fn create_dict_array<T>(
1225    buffer_gen: &mut ZeroBufferGenerator<T>,
1226    dict_val: &ScalarValue,
1227    len: usize,
1228    data_type: DataType,
1229) -> Result<ArrayRef>
1230where
1231    T: ArrowNativeType,
1232{
1233    let dict_vals = dict_val.to_array()?;
1234
1235    let sliced_key_buffer = buffer_gen.get_buffer(len);
1236
1237    // assemble pieces together
1238    let mut builder = ArrayData::builder(data_type)
1239        .len(len)
1240        .add_buffer(sliced_key_buffer);
1241    builder = builder.add_child_data(dict_vals.to_data());
1242    Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
1243        builder.build().unwrap(),
1244    )))
1245}
1246
1247fn create_output_array(
1248    key_buffer_cache: &mut ZeroBufferGenerators,
1249    val: &ScalarValue,
1250    len: usize,
1251) -> Result<ArrayRef> {
1252    if let ScalarValue::Dictionary(key_type, dict_val) = &val {
1253        match key_type.as_ref() {
1254            DataType::Int8 => {
1255                return create_dict_array(
1256                    &mut key_buffer_cache.gen_i8,
1257                    dict_val,
1258                    len,
1259                    val.data_type(),
1260                );
1261            }
1262            DataType::Int16 => {
1263                return create_dict_array(
1264                    &mut key_buffer_cache.gen_i16,
1265                    dict_val,
1266                    len,
1267                    val.data_type(),
1268                );
1269            }
1270            DataType::Int32 => {
1271                return create_dict_array(
1272                    &mut key_buffer_cache.gen_i32,
1273                    dict_val,
1274                    len,
1275                    val.data_type(),
1276                );
1277            }
1278            DataType::Int64 => {
1279                return create_dict_array(
1280                    &mut key_buffer_cache.gen_i64,
1281                    dict_val,
1282                    len,
1283                    val.data_type(),
1284                );
1285            }
1286            DataType::UInt8 => {
1287                return create_dict_array(
1288                    &mut key_buffer_cache.gen_u8,
1289                    dict_val,
1290                    len,
1291                    val.data_type(),
1292                );
1293            }
1294            DataType::UInt16 => {
1295                return create_dict_array(
1296                    &mut key_buffer_cache.gen_u16,
1297                    dict_val,
1298                    len,
1299                    val.data_type(),
1300                );
1301            }
1302            DataType::UInt32 => {
1303                return create_dict_array(
1304                    &mut key_buffer_cache.gen_u32,
1305                    dict_val,
1306                    len,
1307                    val.data_type(),
1308                );
1309            }
1310            DataType::UInt64 => {
1311                return create_dict_array(
1312                    &mut key_buffer_cache.gen_u64,
1313                    dict_val,
1314                    len,
1315                    val.data_type(),
1316                );
1317            }
1318            _ => {}
1319        }
1320    }
1321
1322    val.to_array_of_size(len)
1323}
1324
1325/// The various listing tables does not attempt to read all files
1326/// concurrently, instead they will read files in sequence within a
1327/// partition.  This is an important property as it allows plans to
1328/// run against 1000s of files and not try to open them all
1329/// concurrently.
1330///
1331/// However, it means if we assign more than one file to a partition
1332/// the output sort order will not be preserved as illustrated in the
1333/// following diagrams:
1334///
1335/// When only 1 file is assigned to each partition, each partition is
1336/// correctly sorted on `(A, B, C)`
1337///
1338/// ```text
1339///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
1340///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
1341///┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
1342///  │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
1343///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
1344///  │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
1345///┃                                          │                    │                     ┃
1346///  │                   │ │                    │                    │                 │
1347///┃                                          │                    │                     ┃
1348///  │                   │ │                    │                    │                 │
1349///┃                                          │                    │                     ┃
1350///  │                   │ │                    │                    │                 │
1351///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1352///     DataFusion           DataFusion           DataFusion           DataFusion
1353///┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
1354/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1355///
1356///                                      DataSourceExec
1357///```
1358///
1359/// However, when more than 1 file is assigned to each partition, each
1360/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
1361/// file is scanned, the same values for A, B and C can be repeated in
1362/// the same sorted stream
1363///
1364///```text
1365///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
1366///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
1367///┃   ┌───────────────┐     ┌──────────────┐ │
1368///  │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
1369///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1370///  │ └───────────────┘ │ │ └──────────────┘   ┃
1371///┃   ┌───────────────┐     ┌──────────────┐ │
1372///  │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
1373///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
1374///  │ └───────────────┘ │ │ └──────────────┘   ┃
1375///┃                                          │
1376///  │                   │ │                    ┃
1377///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
1378///     DataFusion           DataFusion         ┃
1379///┃    Partition 1          Partition 2
1380/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
1381///
1382///              DataSourceExec
1383///```
1384fn get_projected_output_ordering(
1385    base_config: &FileScanConfig,
1386    projected_schema: &SchemaRef,
1387) -> Vec<LexOrdering> {
1388    let mut all_orderings = vec![];
1389    for output_ordering in &base_config.output_ordering {
1390        let mut new_ordering = LexOrdering::default();
1391        for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1392            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1393                let name = col.name();
1394                if let Some((idx, _)) = projected_schema.column_with_name(name) {
1395                    // Compute the new sort expression (with correct index) after projection:
1396                    new_ordering.push(PhysicalSortExpr {
1397                        expr: Arc::new(Column::new(name, idx)),
1398                        options: *options,
1399                    });
1400                    continue;
1401                }
1402            }
1403            // Cannot find expression in the projected_schema, stop iterating
1404            // since rest of the orderings are violated
1405            break;
1406        }
1407
1408        // do not push empty entries
1409        // otherwise we may have `Some(vec![])` at the output ordering.
1410        if new_ordering.is_empty() {
1411            continue;
1412        }
1413
1414        // Check if any file groups are not sorted
1415        if base_config.file_groups.iter().any(|group| {
1416            if group.len() <= 1 {
1417                // File groups with <= 1 files are always sorted
1418                return false;
1419            }
1420
1421            let statistics = match MinMaxStatistics::new_from_files(
1422                &new_ordering,
1423                projected_schema,
1424                base_config.projection.as_deref(),
1425                group.iter(),
1426            ) {
1427                Ok(statistics) => statistics,
1428                Err(e) => {
1429                    log::trace!("Error fetching statistics for file group: {e}");
1430                    // we can't prove that it's ordered, so we have to reject it
1431                    return true;
1432                }
1433            };
1434
1435            !statistics.is_sorted()
1436        }) {
1437            debug!(
1438                "Skipping specified output ordering {:?}. \
1439                Some file groups couldn't be determined to be sorted: {:?}",
1440                base_config.output_ordering[0], base_config.file_groups
1441            );
1442            continue;
1443        }
1444
1445        all_orderings.push(new_ordering);
1446    }
1447    all_orderings
1448}
1449
1450/// Convert type to a type suitable for use as a `ListingTable`
1451/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1452/// a reasonable trade off between a reasonable number of partition
1453/// values and space efficiency.
1454///
1455/// This use this to specify types for partition columns. However
1456/// you MAY also choose not to dictionary-encode the data or to use a
1457/// different dictionary type.
1458///
1459/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1460pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1461    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1462}
1463
1464/// Convert a [`ScalarValue`] of partition columns to a type, as
1465/// described in the documentation of [`wrap_partition_type_in_dict`],
1466/// which can wrap the types.
1467pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1468    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473    use crate::{
1474        generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1475        verify_sort_integrity,
1476    };
1477
1478    use super::*;
1479    use arrow::{
1480        array::{Int32Array, RecordBatch},
1481        compute::SortOptions,
1482    };
1483
1484    use datafusion_common::stats::Precision;
1485    use datafusion_common::{assert_batches_eq, DFSchema};
1486    use datafusion_expr::{execution_props::ExecutionProps, SortExpr};
1487    use datafusion_physical_expr::create_physical_expr;
1488    use std::collections::HashMap;
1489
1490    fn create_physical_sort_expr(
1491        e: &SortExpr,
1492        input_dfschema: &DFSchema,
1493        execution_props: &ExecutionProps,
1494    ) -> Result<PhysicalSortExpr> {
1495        let SortExpr {
1496            expr,
1497            asc,
1498            nulls_first,
1499        } = e;
1500        Ok(PhysicalSortExpr {
1501            expr: create_physical_expr(expr, input_dfschema, execution_props)?,
1502            options: SortOptions {
1503                descending: !asc,
1504                nulls_first: *nulls_first,
1505            },
1506        })
1507    }
1508
1509    /// Returns the column names on the schema
1510    pub fn columns(schema: &Schema) -> Vec<String> {
1511        schema.fields().iter().map(|f| f.name().clone()).collect()
1512    }
1513
1514    #[test]
1515    fn physical_plan_config_no_projection() {
1516        let file_schema = aggr_test_schema();
1517        let conf = config_for_projection(
1518            Arc::clone(&file_schema),
1519            None,
1520            Statistics::new_unknown(&file_schema),
1521            to_partition_cols(vec![(
1522                "date".to_owned(),
1523                wrap_partition_type_in_dict(DataType::Utf8),
1524            )]),
1525        );
1526
1527        let (proj_schema, _, proj_statistics, _) = conf.project();
1528        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1529        assert_eq!(
1530            proj_schema.field(file_schema.fields().len()).name(),
1531            "date",
1532            "partition columns are the last columns"
1533        );
1534        assert_eq!(
1535            proj_statistics.column_statistics.len(),
1536            file_schema.fields().len() + 1
1537        );
1538        // TODO implement tests for partition column statistics once implemented
1539
1540        let col_names = conf.projected_file_column_names();
1541        assert_eq!(col_names, None);
1542
1543        let col_indices = conf.file_column_projection_indices();
1544        assert_eq!(col_indices, None);
1545    }
1546
1547    #[test]
1548    fn physical_plan_config_no_projection_tab_cols_as_field() {
1549        let file_schema = aggr_test_schema();
1550
1551        // make a table_partition_col as a field
1552        let table_partition_col =
1553            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1554                .with_metadata(HashMap::from_iter(vec![(
1555                    "key_whatever".to_owned(),
1556                    "value_whatever".to_owned(),
1557                )]));
1558
1559        let conf = config_for_projection(
1560            Arc::clone(&file_schema),
1561            None,
1562            Statistics::new_unknown(&file_schema),
1563            vec![table_partition_col.clone()],
1564        );
1565
1566        // verify the proj_schema includes the last column and exactly the same the field it is defined
1567        let (proj_schema, _, _, _) = conf.project();
1568        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1569        assert_eq!(
1570            *proj_schema.field(file_schema.fields().len()),
1571            table_partition_col,
1572            "partition columns are the last columns and ust have all values defined in created field"
1573        );
1574    }
1575
1576    #[test]
1577    fn physical_plan_config_with_projection() {
1578        let file_schema = aggr_test_schema();
1579        let conf = config_for_projection(
1580            Arc::clone(&file_schema),
1581            Some(vec![file_schema.fields().len(), 0]),
1582            Statistics {
1583                num_rows: Precision::Inexact(10),
1584                // assign the column index to distinct_count to help assert
1585                // the source statistic after the projection
1586                column_statistics: (0..file_schema.fields().len())
1587                    .map(|i| ColumnStatistics {
1588                        distinct_count: Precision::Inexact(i),
1589                        ..Default::default()
1590                    })
1591                    .collect(),
1592                total_byte_size: Precision::Absent,
1593            },
1594            to_partition_cols(vec![(
1595                "date".to_owned(),
1596                wrap_partition_type_in_dict(DataType::Utf8),
1597            )]),
1598        );
1599
1600        let (proj_schema, _, proj_statistics, _) = conf.project();
1601        assert_eq!(
1602            columns(&proj_schema),
1603            vec!["date".to_owned(), "c1".to_owned()]
1604        );
1605        let proj_stat_cols = proj_statistics.column_statistics;
1606        assert_eq!(proj_stat_cols.len(), 2);
1607        // TODO implement tests for proj_stat_cols[0] once partition column
1608        // statistics are implemented
1609        assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1610
1611        let col_names = conf.projected_file_column_names();
1612        assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1613
1614        let col_indices = conf.file_column_projection_indices();
1615        assert_eq!(col_indices, Some(vec![0]));
1616    }
1617
1618    #[test]
1619    fn partition_column_projector() {
1620        let file_batch = build_table_i32(
1621            ("a", &vec![0, 1, 2]),
1622            ("b", &vec![-2, -1, 0]),
1623            ("c", &vec![10, 11, 12]),
1624        );
1625        let partition_cols = vec![
1626            (
1627                "year".to_owned(),
1628                wrap_partition_type_in_dict(DataType::Utf8),
1629            ),
1630            (
1631                "month".to_owned(),
1632                wrap_partition_type_in_dict(DataType::Utf8),
1633            ),
1634            (
1635                "day".to_owned(),
1636                wrap_partition_type_in_dict(DataType::Utf8),
1637            ),
1638        ];
1639        // create a projected schema
1640        let statistics = Statistics {
1641            num_rows: Precision::Inexact(3),
1642            total_byte_size: Precision::Absent,
1643            column_statistics: Statistics::unknown_column(&file_batch.schema()),
1644        };
1645
1646        let conf = config_for_projection(
1647            file_batch.schema(),
1648            // keep all cols from file and 2 from partitioning
1649            Some(vec![
1650                0,
1651                1,
1652                2,
1653                file_batch.schema().fields().len(),
1654                file_batch.schema().fields().len() + 2,
1655            ]),
1656            statistics.clone(),
1657            to_partition_cols(partition_cols.clone()),
1658        );
1659
1660        let source_statistics = conf.file_source.statistics().unwrap();
1661        let conf_stats = conf.statistics().unwrap();
1662
1663        // projection should be reflected in the file source statistics
1664        assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1665
1666        // 3 original statistics + 2 partition statistics
1667        assert_eq!(conf_stats.column_statistics.len(), 5);
1668
1669        // file statics should not be modified
1670        assert_eq!(source_statistics, statistics);
1671        assert_eq!(source_statistics.column_statistics.len(), 3);
1672
1673        let (proj_schema, ..) = conf.project();
1674        // created a projector for that projected schema
1675        let mut proj = PartitionColumnProjector::new(
1676            proj_schema,
1677            &partition_cols
1678                .iter()
1679                .map(|x| x.0.clone())
1680                .collect::<Vec<_>>(),
1681        );
1682
1683        // project first batch
1684        let projected_batch = proj
1685            .project(
1686                // file_batch is ok here because we kept all the file cols in the projection
1687                file_batch,
1688                &[
1689                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1690                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1691                    wrap_partition_value_in_dict(ScalarValue::from("26")),
1692                ],
1693            )
1694            .expect("Projection of partition columns into record batch failed");
1695        let expected = [
1696            "+---+----+----+------+-----+",
1697            "| a | b  | c  | year | day |",
1698            "+---+----+----+------+-----+",
1699            "| 0 | -2 | 10 | 2021 | 26  |",
1700            "| 1 | -1 | 11 | 2021 | 26  |",
1701            "| 2 | 0  | 12 | 2021 | 26  |",
1702            "+---+----+----+------+-----+",
1703        ];
1704        assert_batches_eq!(expected, &[projected_batch]);
1705
1706        // project another batch that is larger than the previous one
1707        let file_batch = build_table_i32(
1708            ("a", &vec![5, 6, 7, 8, 9]),
1709            ("b", &vec![-10, -9, -8, -7, -6]),
1710            ("c", &vec![12, 13, 14, 15, 16]),
1711        );
1712        let projected_batch = proj
1713            .project(
1714                // file_batch is ok here because we kept all the file cols in the projection
1715                file_batch,
1716                &[
1717                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1718                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1719                    wrap_partition_value_in_dict(ScalarValue::from("27")),
1720                ],
1721            )
1722            .expect("Projection of partition columns into record batch failed");
1723        let expected = [
1724            "+---+-----+----+------+-----+",
1725            "| a | b   | c  | year | day |",
1726            "+---+-----+----+------+-----+",
1727            "| 5 | -10 | 12 | 2021 | 27  |",
1728            "| 6 | -9  | 13 | 2021 | 27  |",
1729            "| 7 | -8  | 14 | 2021 | 27  |",
1730            "| 8 | -7  | 15 | 2021 | 27  |",
1731            "| 9 | -6  | 16 | 2021 | 27  |",
1732            "+---+-----+----+------+-----+",
1733        ];
1734        assert_batches_eq!(expected, &[projected_batch]);
1735
1736        // project another batch that is smaller than the previous one
1737        let file_batch = build_table_i32(
1738            ("a", &vec![0, 1, 3]),
1739            ("b", &vec![2, 3, 4]),
1740            ("c", &vec![4, 5, 6]),
1741        );
1742        let projected_batch = proj
1743            .project(
1744                // file_batch is ok here because we kept all the file cols in the projection
1745                file_batch,
1746                &[
1747                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1748                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1749                    wrap_partition_value_in_dict(ScalarValue::from("28")),
1750                ],
1751            )
1752            .expect("Projection of partition columns into record batch failed");
1753        let expected = [
1754            "+---+---+---+------+-----+",
1755            "| a | b | c | year | day |",
1756            "+---+---+---+------+-----+",
1757            "| 0 | 2 | 4 | 2021 | 28  |",
1758            "| 1 | 3 | 5 | 2021 | 28  |",
1759            "| 3 | 4 | 6 | 2021 | 28  |",
1760            "+---+---+---+------+-----+",
1761        ];
1762        assert_batches_eq!(expected, &[projected_batch]);
1763
1764        // forgot to dictionary-wrap the scalar value
1765        let file_batch = build_table_i32(
1766            ("a", &vec![0, 1, 2]),
1767            ("b", &vec![-2, -1, 0]),
1768            ("c", &vec![10, 11, 12]),
1769        );
1770        let projected_batch = proj
1771            .project(
1772                // file_batch is ok here because we kept all the file cols in the projection
1773                file_batch,
1774                &[
1775                    ScalarValue::from("2021"),
1776                    ScalarValue::from("10"),
1777                    ScalarValue::from("26"),
1778                ],
1779            )
1780            .expect("Projection of partition columns into record batch failed");
1781        let expected = [
1782            "+---+----+----+------+-----+",
1783            "| a | b  | c  | year | day |",
1784            "+---+----+----+------+-----+",
1785            "| 0 | -2 | 10 | 2021 | 26  |",
1786            "| 1 | -1 | 11 | 2021 | 26  |",
1787            "| 2 | 0  | 12 | 2021 | 26  |",
1788            "+---+----+----+------+-----+",
1789        ];
1790        assert_batches_eq!(expected, &[projected_batch]);
1791    }
1792
1793    #[test]
1794    fn test_projected_file_schema_with_partition_col() {
1795        let schema = aggr_test_schema();
1796        let partition_cols = vec![
1797            (
1798                "part1".to_owned(),
1799                wrap_partition_type_in_dict(DataType::Utf8),
1800            ),
1801            (
1802                "part2".to_owned(),
1803                wrap_partition_type_in_dict(DataType::Utf8),
1804            ),
1805        ];
1806
1807        // Projected file schema for config with projection including partition column
1808        let projection = config_for_projection(
1809            schema.clone(),
1810            Some(vec![0, 3, 5, schema.fields().len()]),
1811            Statistics::new_unknown(&schema),
1812            to_partition_cols(partition_cols),
1813        )
1814        .projected_file_schema();
1815
1816        // Assert partition column filtered out in projected file schema
1817        let expected_columns = vec!["c1", "c4", "c6"];
1818        let actual_columns = projection
1819            .fields()
1820            .iter()
1821            .map(|f| f.name().clone())
1822            .collect::<Vec<_>>();
1823        assert_eq!(expected_columns, actual_columns);
1824    }
1825
1826    #[test]
1827    fn test_projected_file_schema_without_projection() {
1828        let schema = aggr_test_schema();
1829        let partition_cols = vec![
1830            (
1831                "part1".to_owned(),
1832                wrap_partition_type_in_dict(DataType::Utf8),
1833            ),
1834            (
1835                "part2".to_owned(),
1836                wrap_partition_type_in_dict(DataType::Utf8),
1837            ),
1838        ];
1839
1840        // Projected file schema for config without projection
1841        let projection = config_for_projection(
1842            schema.clone(),
1843            None,
1844            Statistics::new_unknown(&schema),
1845            to_partition_cols(partition_cols),
1846        )
1847        .projected_file_schema();
1848
1849        // Assert projected file schema is equal to file schema
1850        assert_eq!(projection.fields(), schema.fields());
1851    }
1852
1853    #[test]
1854    fn test_split_groups_by_statistics() -> Result<()> {
1855        use chrono::TimeZone;
1856        use datafusion_common::DFSchema;
1857        use datafusion_expr::execution_props::ExecutionProps;
1858        use object_store::{path::Path, ObjectMeta};
1859
1860        struct File {
1861            name: &'static str,
1862            date: &'static str,
1863            statistics: Vec<Option<(f64, f64)>>,
1864        }
1865        impl File {
1866            fn new(
1867                name: &'static str,
1868                date: &'static str,
1869                statistics: Vec<Option<(f64, f64)>>,
1870            ) -> Self {
1871                Self {
1872                    name,
1873                    date,
1874                    statistics,
1875                }
1876            }
1877        }
1878
1879        struct TestCase {
1880            name: &'static str,
1881            file_schema: Schema,
1882            files: Vec<File>,
1883            sort: Vec<SortExpr>,
1884            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1885        }
1886
1887        use datafusion_expr::col;
1888        let cases = vec![
1889            TestCase {
1890                name: "test sort",
1891                file_schema: Schema::new(vec![Field::new(
1892                    "value".to_string(),
1893                    DataType::Float64,
1894                    false,
1895                )]),
1896                files: vec![
1897                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1898                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1899                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1900                ],
1901                sort: vec![col("value").sort(true, false)],
1902                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1903            },
1904            // same input but file '2' is in the middle
1905            // test that we still order correctly
1906            TestCase {
1907                name: "test sort with files ordered differently",
1908                file_schema: Schema::new(vec![Field::new(
1909                    "value".to_string(),
1910                    DataType::Float64,
1911                    false,
1912                )]),
1913                files: vec![
1914                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1915                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1916                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1917                ],
1918                sort: vec![col("value").sort(true, false)],
1919                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1920            },
1921            TestCase {
1922                name: "reverse sort",
1923                file_schema: Schema::new(vec![Field::new(
1924                    "value".to_string(),
1925                    DataType::Float64,
1926                    false,
1927                )]),
1928                files: vec![
1929                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1930                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1931                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1932                ],
1933                sort: vec![col("value").sort(false, true)],
1934                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1935            },
1936            // reject nullable sort columns
1937            TestCase {
1938                name: "no nullable sort columns",
1939                file_schema: Schema::new(vec![Field::new(
1940                    "value".to_string(),
1941                    DataType::Float64,
1942                    true, // should fail because nullable
1943                )]),
1944                files: vec![
1945                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1946                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1947                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1948                ],
1949                sort: vec![col("value").sort(true, false)],
1950                expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column")
1951            },
1952            TestCase {
1953                name: "all three non-overlapping",
1954                file_schema: Schema::new(vec![Field::new(
1955                    "value".to_string(),
1956                    DataType::Float64,
1957                    false,
1958                )]),
1959                files: vec![
1960                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1961                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1962                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1963                ],
1964                sort: vec![col("value").sort(true, false)],
1965                expected_result: Ok(vec![vec!["0", "1", "2"]]),
1966            },
1967            TestCase {
1968                name: "all three overlapping",
1969                file_schema: Schema::new(vec![Field::new(
1970                    "value".to_string(),
1971                    DataType::Float64,
1972                    false,
1973                )]),
1974                files: vec![
1975                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1976                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1977                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1978                ],
1979                sort: vec![col("value").sort(true, false)],
1980                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1981            },
1982            TestCase {
1983                name: "empty input",
1984                file_schema: Schema::new(vec![Field::new(
1985                    "value".to_string(),
1986                    DataType::Float64,
1987                    false,
1988                )]),
1989                files: vec![],
1990                sort: vec![col("value").sort(true, false)],
1991                expected_result: Ok(vec![]),
1992            },
1993            TestCase {
1994                name: "one file missing statistics",
1995                file_schema: Schema::new(vec![Field::new(
1996                    "value".to_string(),
1997                    DataType::Float64,
1998                    false,
1999                )]),
2000                files: vec![
2001                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
2002                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
2003                    File::new("2", "2023-01-02", vec![None]),
2004                ],
2005                sort: vec![col("value").sort(true, false)],
2006                expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
2007            },
2008        ];
2009
2010        for case in cases {
2011            let table_schema = Arc::new(Schema::new(
2012                case.file_schema
2013                    .fields()
2014                    .clone()
2015                    .into_iter()
2016                    .cloned()
2017                    .chain(Some(Arc::new(Field::new(
2018                        "date".to_string(),
2019                        DataType::Utf8,
2020                        false,
2021                    ))))
2022                    .collect::<Vec<_>>(),
2023            ));
2024            let sort_order = LexOrdering::from(
2025                case.sort
2026                    .into_iter()
2027                    .map(|expr| {
2028                        create_physical_sort_expr(
2029                            &expr,
2030                            &DFSchema::try_from(table_schema.as_ref().clone())?,
2031                            &ExecutionProps::default(),
2032                        )
2033                    })
2034                    .collect::<Result<Vec<_>>>()?,
2035            );
2036
2037            let partitioned_files = FileGroup::new(
2038                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
2039            );
2040            let result = FileScanConfig::split_groups_by_statistics(
2041                &table_schema,
2042                &[partitioned_files.clone()],
2043                &sort_order,
2044            );
2045            let results_by_name = result
2046                .as_ref()
2047                .map(|file_groups| {
2048                    file_groups
2049                        .iter()
2050                        .map(|file_group| {
2051                            file_group
2052                                .iter()
2053                                .map(|file| {
2054                                    partitioned_files
2055                                        .iter()
2056                                        .find_map(|f| {
2057                                            if f.object_meta == file.object_meta {
2058                                                Some(
2059                                                    f.object_meta
2060                                                        .location
2061                                                        .as_ref()
2062                                                        .rsplit('/')
2063                                                        .next()
2064                                                        .unwrap()
2065                                                        .trim_end_matches(".parquet"),
2066                                                )
2067                                            } else {
2068                                                None
2069                                            }
2070                                        })
2071                                        .unwrap()
2072                                })
2073                                .collect::<Vec<_>>()
2074                        })
2075                        .collect::<Vec<_>>()
2076                })
2077                .map_err(|e| e.strip_backtrace().leak() as &'static str);
2078
2079            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
2080        }
2081
2082        return Ok(());
2083
2084        impl From<File> for PartitionedFile {
2085            fn from(file: File) -> Self {
2086                PartitionedFile {
2087                    object_meta: ObjectMeta {
2088                        location: Path::from(format!(
2089                            "data/date={}/{}.parquet",
2090                            file.date, file.name
2091                        )),
2092                        last_modified: chrono::Utc.timestamp_nanos(0),
2093                        size: 0,
2094                        e_tag: None,
2095                        version: None,
2096                    },
2097                    partition_values: vec![ScalarValue::from(file.date)],
2098                    range: None,
2099                    statistics: Some(Arc::new(Statistics {
2100                        num_rows: Precision::Absent,
2101                        total_byte_size: Precision::Absent,
2102                        column_statistics: file
2103                            .statistics
2104                            .into_iter()
2105                            .map(|stats| {
2106                                stats
2107                                    .map(|(min, max)| ColumnStatistics {
2108                                        min_value: Precision::Exact(ScalarValue::from(
2109                                            min,
2110                                        )),
2111                                        max_value: Precision::Exact(ScalarValue::from(
2112                                            max,
2113                                        )),
2114                                        ..Default::default()
2115                                    })
2116                                    .unwrap_or_default()
2117                            })
2118                            .collect::<Vec<_>>(),
2119                    })),
2120                    extensions: None,
2121                    metadata_size_hint: None,
2122                }
2123            }
2124        }
2125    }
2126
2127    // sets default for configs that play no role in projections
2128    fn config_for_projection(
2129        file_schema: SchemaRef,
2130        projection: Option<Vec<usize>>,
2131        statistics: Statistics,
2132        table_partition_cols: Vec<Field>,
2133    ) -> FileScanConfig {
2134        FileScanConfigBuilder::new(
2135            ObjectStoreUrl::parse("test:///").unwrap(),
2136            file_schema,
2137            Arc::new(MockSource::default()),
2138        )
2139        .with_projection(projection)
2140        .with_statistics(statistics)
2141        .with_table_partition_cols(table_partition_cols)
2142        .build()
2143    }
2144
2145    /// Convert partition columns from Vec<String DataType> to Vec<Field>
2146    fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
2147        table_partition_cols
2148            .iter()
2149            .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
2150            .collect::<Vec<_>>()
2151    }
2152
2153    /// returns record batch with 3 columns of i32 in memory
2154    pub fn build_table_i32(
2155        a: (&str, &Vec<i32>),
2156        b: (&str, &Vec<i32>),
2157        c: (&str, &Vec<i32>),
2158    ) -> RecordBatch {
2159        let schema = Schema::new(vec![
2160            Field::new(a.0, DataType::Int32, false),
2161            Field::new(b.0, DataType::Int32, false),
2162            Field::new(c.0, DataType::Int32, false),
2163        ]);
2164
2165        RecordBatch::try_new(
2166            Arc::new(schema),
2167            vec![
2168                Arc::new(Int32Array::from(a.1.clone())),
2169                Arc::new(Int32Array::from(b.1.clone())),
2170                Arc::new(Int32Array::from(c.1.clone())),
2171            ],
2172        )
2173        .unwrap()
2174    }
2175
2176    #[test]
2177    fn test_file_scan_config_builder() {
2178        let file_schema = aggr_test_schema();
2179        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2180        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2181
2182        // Create a builder with required parameters
2183        let builder = FileScanConfigBuilder::new(
2184            object_store_url.clone(),
2185            Arc::clone(&file_schema),
2186            Arc::clone(&file_source),
2187        );
2188
2189        // Build with various configurations
2190        let config = builder
2191            .with_limit(Some(1000))
2192            .with_projection(Some(vec![0, 1]))
2193            .with_table_partition_cols(vec![Field::new(
2194                "date",
2195                wrap_partition_type_in_dict(DataType::Utf8),
2196                false,
2197            )])
2198            .with_constraints(Constraints::empty())
2199            .with_statistics(Statistics::new_unknown(&file_schema))
2200            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
2201                "test.parquet".to_string(),
2202                1024,
2203            )])])
2204            .with_output_ordering(vec![LexOrdering::default()])
2205            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
2206            .with_newlines_in_values(true)
2207            .build();
2208
2209        // Verify the built config has all the expected values
2210        assert_eq!(config.object_store_url, object_store_url);
2211        assert_eq!(config.file_schema, file_schema);
2212        assert_eq!(config.limit, Some(1000));
2213        assert_eq!(config.projection, Some(vec![0, 1]));
2214        assert_eq!(config.table_partition_cols.len(), 1);
2215        assert_eq!(config.table_partition_cols[0].name(), "date");
2216        assert_eq!(config.file_groups.len(), 1);
2217        assert_eq!(config.file_groups[0].len(), 1);
2218        assert_eq!(
2219            config.file_groups[0][0].object_meta.location.as_ref(),
2220            "test.parquet"
2221        );
2222        assert_eq!(
2223            config.file_compression_type,
2224            FileCompressionType::UNCOMPRESSED
2225        );
2226        assert!(config.new_lines_in_values);
2227        assert_eq!(config.output_ordering.len(), 1);
2228    }
2229
2230    #[test]
2231    fn test_file_scan_config_builder_defaults() {
2232        let file_schema = aggr_test_schema();
2233        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2234        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2235
2236        // Create a builder with only required parameters and build without any additional configurations
2237        let config = FileScanConfigBuilder::new(
2238            object_store_url.clone(),
2239            Arc::clone(&file_schema),
2240            Arc::clone(&file_source),
2241        )
2242        .build();
2243
2244        // Verify default values
2245        assert_eq!(config.object_store_url, object_store_url);
2246        assert_eq!(config.file_schema, file_schema);
2247        assert_eq!(config.limit, None);
2248        assert_eq!(config.projection, None);
2249        assert!(config.table_partition_cols.is_empty());
2250        assert!(config.file_groups.is_empty());
2251        assert_eq!(
2252            config.file_compression_type,
2253            FileCompressionType::UNCOMPRESSED
2254        );
2255        assert!(!config.new_lines_in_values);
2256        assert!(config.output_ordering.is_empty());
2257        assert!(config.constraints.is_empty());
2258
2259        // Verify statistics are set to unknown
2260        assert_eq!(
2261            config.file_source.statistics().unwrap().num_rows,
2262            Precision::Absent
2263        );
2264        assert_eq!(
2265            config.file_source.statistics().unwrap().total_byte_size,
2266            Precision::Absent
2267        );
2268        assert_eq!(
2269            config
2270                .file_source
2271                .statistics()
2272                .unwrap()
2273                .column_statistics
2274                .len(),
2275            file_schema.fields().len()
2276        );
2277        for stat in config.file_source.statistics().unwrap().column_statistics {
2278            assert_eq!(stat.distinct_count, Precision::Absent);
2279            assert_eq!(stat.min_value, Precision::Absent);
2280            assert_eq!(stat.max_value, Precision::Absent);
2281            assert_eq!(stat.null_count, Precision::Absent);
2282        }
2283    }
2284
2285    #[test]
2286    fn test_file_scan_config_builder_new_from() {
2287        let schema = aggr_test_schema();
2288        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2289        let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
2290        let partition_cols = vec![Field::new(
2291            "date",
2292            wrap_partition_type_in_dict(DataType::Utf8),
2293            false,
2294        )];
2295        let file = PartitionedFile::new("test_file.parquet", 100);
2296
2297        // Create a config with non-default values
2298        let original_config = FileScanConfigBuilder::new(
2299            object_store_url.clone(),
2300            Arc::clone(&schema),
2301            Arc::clone(&file_source),
2302        )
2303        .with_projection(Some(vec![0, 2]))
2304        .with_limit(Some(10))
2305        .with_table_partition_cols(partition_cols.clone())
2306        .with_file(file.clone())
2307        .with_constraints(Constraints::default())
2308        .with_newlines_in_values(true)
2309        .build();
2310
2311        // Create a new builder from the config
2312        let new_builder = FileScanConfigBuilder::from(original_config);
2313
2314        // Build a new config from this builder
2315        let new_config = new_builder.build();
2316
2317        // Verify properties match
2318        assert_eq!(new_config.object_store_url, object_store_url);
2319        assert_eq!(new_config.file_schema, schema);
2320        assert_eq!(new_config.projection, Some(vec![0, 2]));
2321        assert_eq!(new_config.limit, Some(10));
2322        assert_eq!(new_config.table_partition_cols, partition_cols);
2323        assert_eq!(new_config.file_groups.len(), 1);
2324        assert_eq!(new_config.file_groups[0].len(), 1);
2325        assert_eq!(
2326            new_config.file_groups[0][0].object_meta.location.as_ref(),
2327            "test_file.parquet"
2328        );
2329        assert_eq!(new_config.constraints, Constraints::default());
2330        assert!(new_config.new_lines_in_values);
2331    }
2332
2333    #[test]
2334    fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2335        use datafusion_common::DFSchema;
2336        use datafusion_expr::{col, execution_props::ExecutionProps};
2337
2338        let schema = Arc::new(Schema::new(vec![Field::new(
2339            "value",
2340            DataType::Float64,
2341            false,
2342        )]));
2343
2344        // Setup sort expression
2345        let exec_props = ExecutionProps::new();
2346        let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2347        let sort_expr = vec![col("value").sort(true, false)];
2348
2349        let physical_sort_exprs: Vec<_> = sort_expr
2350            .iter()
2351            .map(|expr| create_physical_sort_expr(expr, &df_schema, &exec_props).unwrap())
2352            .collect();
2353
2354        let sort_ordering = LexOrdering::from(physical_sort_exprs);
2355
2356        // Test case parameters
2357        struct TestCase {
2358            name: String,
2359            file_count: usize,
2360            overlap_factor: f64,
2361            target_partitions: usize,
2362            expected_partition_count: usize,
2363        }
2364
2365        let test_cases = vec![
2366            // Basic cases
2367            TestCase {
2368                name: "no_overlap_10_files_4_partitions".to_string(),
2369                file_count: 10,
2370                overlap_factor: 0.0,
2371                target_partitions: 4,
2372                expected_partition_count: 4,
2373            },
2374            TestCase {
2375                name: "medium_overlap_20_files_5_partitions".to_string(),
2376                file_count: 20,
2377                overlap_factor: 0.5,
2378                target_partitions: 5,
2379                expected_partition_count: 5,
2380            },
2381            TestCase {
2382                name: "high_overlap_30_files_3_partitions".to_string(),
2383                file_count: 30,
2384                overlap_factor: 0.8,
2385                target_partitions: 3,
2386                expected_partition_count: 7,
2387            },
2388            // Edge cases
2389            TestCase {
2390                name: "fewer_files_than_partitions".to_string(),
2391                file_count: 3,
2392                overlap_factor: 0.0,
2393                target_partitions: 10,
2394                expected_partition_count: 3, // Should only create as many partitions as files
2395            },
2396            TestCase {
2397                name: "single_file".to_string(),
2398                file_count: 1,
2399                overlap_factor: 0.0,
2400                target_partitions: 5,
2401                expected_partition_count: 1, // Should create only one partition
2402            },
2403            TestCase {
2404                name: "empty_files".to_string(),
2405                file_count: 0,
2406                overlap_factor: 0.0,
2407                target_partitions: 3,
2408                expected_partition_count: 0, // Empty result for empty input
2409            },
2410        ];
2411
2412        for case in test_cases {
2413            println!("Running test case: {}", case.name);
2414
2415            // Generate files using bench utility function
2416            let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2417
2418            // Call the function under test
2419            let result =
2420                FileScanConfig::split_groups_by_statistics_with_target_partitions(
2421                    &schema,
2422                    &file_groups,
2423                    &sort_ordering,
2424                    case.target_partitions,
2425                )?;
2426
2427            // Verify results
2428            println!(
2429                "Created {} partitions (target was {})",
2430                result.len(),
2431                case.target_partitions
2432            );
2433
2434            // Check partition count
2435            assert_eq!(
2436                result.len(),
2437                case.expected_partition_count,
2438                "Case '{}': Unexpected partition count",
2439                case.name
2440            );
2441
2442            // Verify sort integrity
2443            assert!(
2444                verify_sort_integrity(&result),
2445                "Case '{}': Files within partitions are not properly ordered",
2446                case.name
2447            );
2448
2449            // Distribution check for partitions
2450            if case.file_count > 1 && case.expected_partition_count > 1 {
2451                let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2452                let max_size = *group_sizes.iter().max().unwrap();
2453                let min_size = *group_sizes.iter().min().unwrap();
2454
2455                // Check partition balancing - difference shouldn't be extreme
2456                let avg_files_per_partition =
2457                    case.file_count as f64 / case.expected_partition_count as f64;
2458                assert!(
2459                    (max_size as f64) < 2.0 * avg_files_per_partition,
2460                    "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2461                    case.name,
2462                    max_size,
2463                    avg_files_per_partition
2464                );
2465
2466                println!(
2467                    "Distribution - min files: {}, max files: {}",
2468                    min_size, max_size
2469                );
2470            }
2471        }
2472
2473        // Test error case: zero target partitions
2474        let empty_groups: Vec<FileGroup> = vec![];
2475        let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2476            &schema,
2477            &empty_groups,
2478            &sort_ordering,
2479            0,
2480        )
2481        .unwrap_err();
2482
2483        assert!(
2484            err.to_string()
2485                .contains("target_partitions must be greater than 0"),
2486            "Expected error for zero target partitions"
2487        );
2488
2489        Ok(())
2490    }
2491}