datafusion_datasource/
file_scan_config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21use std::{
22    any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23    fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24};
25
26use arrow::{
27    array::{
28        ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
29        RecordBatchOptions,
30    },
31    buffer::Buffer,
32    datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
33};
34use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics};
35use datafusion_common::{DataFusionError, ScalarValue};
36use datafusion_execution::{
37    object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
38};
39use datafusion_physical_expr::{
40    expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
41    PhysicalSortExpr,
42};
43use datafusion_physical_plan::{
44    display::{display_orderings, ProjectSchemaDisplay},
45    metrics::ExecutionPlanMetricsSet,
46    projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
47    DisplayAs, DisplayFormatType, ExecutionPlan,
48};
49use log::{debug, warn};
50
51use crate::{
52    display::FileGroupsDisplay,
53    file::FileSource,
54    file_compression_type::FileCompressionType,
55    file_stream::FileStream,
56    source::{DataSource, DataSourceExec},
57    statistics::MinMaxStatistics,
58    PartitionedFile,
59};
60
61/// The base configurations for a [`DataSourceExec`], the a physical plan for
62/// any given file format.
63///
64/// Use [`Self::build`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
65///
66/// # Example
67/// ```
68/// # use std::any::Any;
69/// # use std::sync::Arc;
70/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
71/// # use object_store::ObjectStore;
72/// # use datafusion_common::Statistics;
73/// # use datafusion_datasource::file::FileSource;
74/// # use datafusion_datasource::PartitionedFile;
75/// # use datafusion_datasource::file_scan_config::FileScanConfig;
76/// # use datafusion_datasource::file_stream::FileOpener;
77/// # use datafusion_execution::object_store::ObjectStoreUrl;
78/// # use datafusion_physical_plan::ExecutionPlan;
79/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
80/// # let file_schema = Arc::new(Schema::new(vec![
81/// #  Field::new("c1", DataType::Int32, false),
82/// #  Field::new("c2", DataType::Int32, false),
83/// #  Field::new("c3", DataType::Int32, false),
84/// #  Field::new("c4", DataType::Int32, false),
85/// # ]));
86/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
87/// # struct ParquetSource {
88/// #    projected_statistics: Option<Statistics>
89/// # };
90/// # impl FileSource for ParquetSource {
91/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
92/// #  fn as_any(&self) -> &dyn Any { self  }
93/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
94/// #  fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { unimplemented!() }
95/// #  fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
96/// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics)} ) }
97/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
98/// #  fn statistics(&self) -> datafusion_common::Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) }
99/// #  fn file_type(&self) -> &str { "parquet" }
100/// #  }
101/// # impl ParquetSource {
102/// #  fn new() -> Self { Self {projected_statistics: None} }
103/// # }
104/// // create FileScan config for reading parquet files from file://
105/// let object_store_url = ObjectStoreUrl::local_filesystem();
106/// let file_source = Arc::new(ParquetSource::new());
107/// let config = FileScanConfig::new(object_store_url, file_schema, file_source)
108///   .with_limit(Some(1000))            // read only the first 1000 records
109///   .with_projection(Some(vec![2, 3])) // project columns 2 and 3
110///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
111///   .with_file(PartitionedFile::new("file1.parquet", 1234))
112///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
113///   // in a  single row group
114///   .with_file_group(vec![
115///    PartitionedFile::new("file2.parquet", 56),
116///    PartitionedFile::new("file3.parquet", 78),
117///   ]);
118/// // create an execution plan from the config
119/// let plan: Arc<dyn ExecutionPlan> = config.build();
120/// ```
121#[derive(Clone)]
122pub struct FileScanConfig {
123    /// Object store URL, used to get an [`ObjectStore`] instance from
124    /// [`RuntimeEnv::object_store`]
125    ///
126    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
127    /// as `file://` or `s3://my_bucket`. It should not include the path to the
128    /// file itself. The relevant URL prefix must be registered via
129    /// [`RuntimeEnv::register_object_store`]
130    ///
131    /// [`ObjectStore`]: object_store::ObjectStore
132    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
133    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
134    pub object_store_url: ObjectStoreUrl,
135    /// Schema before `projection` is applied. It contains the all columns that may
136    /// appear in the files. It does not include table partition columns
137    /// that may be added.
138    pub file_schema: SchemaRef,
139    /// List of files to be processed, grouped into partitions
140    ///
141    /// Each file must have a schema of `file_schema` or a subset. If
142    /// a particular file has a subset, the missing columns are
143    /// padded with NULLs.
144    ///
145    /// DataFusion may attempt to read each partition of files
146    /// concurrently, however files *within* a partition will be read
147    /// sequentially, one after the next.
148    pub file_groups: Vec<Vec<PartitionedFile>>,
149    /// Table constraints
150    pub constraints: Constraints,
151    /// Estimated overall statistics of the files, taking `filters` into account.
152    /// Defaults to [`Statistics::new_unknown`].
153    pub statistics: Statistics,
154    /// Columns on which to project the data. Indexes that are higher than the
155    /// number of columns of `file_schema` refer to `table_partition_cols`.
156    pub projection: Option<Vec<usize>>,
157    /// The maximum number of records to read from this plan. If `None`,
158    /// all records after filtering are returned.
159    pub limit: Option<usize>,
160    /// The partitioning columns
161    pub table_partition_cols: Vec<Field>,
162    /// All equivalent lexicographical orderings that describe the schema.
163    pub output_ordering: Vec<LexOrdering>,
164    /// File compression type
165    pub file_compression_type: FileCompressionType,
166    /// Are new lines in values supported for CSVOptions
167    pub new_lines_in_values: bool,
168    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
169    pub file_source: Arc<dyn FileSource>,
170}
171
172impl DataSource for FileScanConfig {
173    fn open(
174        &self,
175        partition: usize,
176        context: Arc<TaskContext>,
177    ) -> Result<SendableRecordBatchStream> {
178        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
179
180        let source = self
181            .file_source
182            .with_batch_size(context.session_config().batch_size())
183            .with_schema(Arc::clone(&self.file_schema))
184            .with_projection(self);
185
186        let opener = source.create_file_opener(object_store, self, partition);
187
188        let stream = FileStream::new(self, partition, opener, source.metrics())?;
189        Ok(Box::pin(stream))
190    }
191
192    fn as_any(&self) -> &dyn Any {
193        self
194    }
195
196    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
197        let (schema, _, _, orderings) = self.project();
198
199        write!(f, "file_groups=")?;
200        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
201
202        if !schema.fields().is_empty() {
203            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
204        }
205
206        if let Some(limit) = self.limit {
207            write!(f, ", limit={limit}")?;
208        }
209
210        display_orderings(f, &orderings)?;
211
212        if !self.constraints.is_empty() {
213            write!(f, ", {}", self.constraints)?;
214        }
215
216        self.fmt_file_source(t, f)
217    }
218
219    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
220    fn repartitioned(
221        &self,
222        target_partitions: usize,
223        repartition_file_min_size: usize,
224        output_ordering: Option<LexOrdering>,
225    ) -> Result<Option<Arc<dyn DataSource>>> {
226        let source = self.file_source.repartitioned(
227            target_partitions,
228            repartition_file_min_size,
229            output_ordering,
230            self,
231        )?;
232
233        Ok(source.map(|s| Arc::new(s) as _))
234    }
235
236    fn output_partitioning(&self) -> Partitioning {
237        Partitioning::UnknownPartitioning(self.file_groups.len())
238    }
239
240    fn eq_properties(&self) -> EquivalenceProperties {
241        let (schema, constraints, _, orderings) = self.project();
242        EquivalenceProperties::new_with_orderings(schema, orderings.as_slice())
243            .with_constraints(constraints)
244    }
245
246    fn statistics(&self) -> Result<Statistics> {
247        Ok(self.projected_stats())
248    }
249
250    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
251        let source = self.clone();
252        Some(Arc::new(source.with_limit(limit)))
253    }
254
255    fn fetch(&self) -> Option<usize> {
256        self.limit
257    }
258
259    fn metrics(&self) -> ExecutionPlanMetricsSet {
260        self.file_source.metrics().clone()
261    }
262
263    fn try_swapping_with_projection(
264        &self,
265        projection: &ProjectionExec,
266    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
267        // If there is any non-column or alias-carrier expression, Projection should not be removed.
268        // This process can be moved into CsvExec, but it would be an overlap of their responsibility.
269
270        let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
271            expr.as_any()
272                .downcast_ref::<Column>()
273                .map(|expr| expr.index() >= self.file_schema.fields().len())
274                .unwrap_or(false)
275        });
276
277        Ok(
278            (all_alias_free_columns(projection.expr()) && !partitioned_columns_in_proj)
279                .then(|| {
280                    let file_scan = self.clone();
281                    let source = Arc::clone(&file_scan.file_source);
282                    let new_projections = new_projections_for_columns(
283                        projection,
284                        &file_scan
285                            .projection
286                            .clone()
287                            .unwrap_or((0..self.file_schema.fields().len()).collect()),
288                    );
289                    file_scan
290                        // Assign projected statistics to source
291                        .with_projection(Some(new_projections))
292                        .with_source(source)
293                        .build() as _
294                }),
295        )
296    }
297}
298
299impl FileScanConfig {
300    /// Create a new [`FileScanConfig`] with default settings for scanning files.
301    ///
302    /// See example on [`FileScanConfig`]
303    ///
304    /// No file groups are added by default. See [`Self::with_file`], [`Self::with_file_group`] and
305    /// [`Self::with_file_groups`].
306    ///
307    /// # Parameters:
308    /// * `object_store_url`: See [`Self::object_store_url`]
309    /// * `file_schema`: See [`Self::file_schema`]
310    pub fn new(
311        object_store_url: ObjectStoreUrl,
312        file_schema: SchemaRef,
313        file_source: Arc<dyn FileSource>,
314    ) -> Self {
315        let statistics = Statistics::new_unknown(&file_schema);
316
317        let mut config = Self {
318            object_store_url,
319            file_schema,
320            file_groups: vec![],
321            constraints: Constraints::empty(),
322            statistics,
323            projection: None,
324            limit: None,
325            table_partition_cols: vec![],
326            output_ordering: vec![],
327            file_compression_type: FileCompressionType::UNCOMPRESSED,
328            new_lines_in_values: false,
329            file_source: Arc::clone(&file_source),
330        };
331
332        config = config.with_source(Arc::clone(&file_source));
333        config
334    }
335
336    /// Set the file source
337    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
338        self.file_source = file_source.with_statistics(self.statistics.clone());
339        self
340    }
341
342    /// Set the table constraints of the files
343    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
344        self.constraints = constraints;
345        self
346    }
347
348    /// Set the statistics of the files
349    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
350        self.statistics = statistics.clone();
351        self.file_source = self.file_source.with_statistics(statistics);
352        self
353    }
354
355    fn projection_indices(&self) -> Vec<usize> {
356        match &self.projection {
357            Some(proj) => proj.clone(),
358            None => (0..self.file_schema.fields().len()
359                + self.table_partition_cols.len())
360                .collect(),
361        }
362    }
363
364    fn projected_stats(&self) -> Statistics {
365        let statistics = self
366            .file_source
367            .statistics()
368            .unwrap_or(self.statistics.clone());
369
370        let table_cols_stats = self
371            .projection_indices()
372            .into_iter()
373            .map(|idx| {
374                if idx < self.file_schema.fields().len() {
375                    statistics.column_statistics[idx].clone()
376                } else {
377                    // TODO provide accurate stat for partition column (#1186)
378                    ColumnStatistics::new_unknown()
379                }
380            })
381            .collect();
382
383        Statistics {
384            num_rows: statistics.num_rows,
385            // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
386            total_byte_size: statistics.total_byte_size,
387            column_statistics: table_cols_stats,
388        }
389    }
390
391    fn projected_schema(&self) -> Arc<Schema> {
392        let table_fields: Vec<_> = self
393            .projection_indices()
394            .into_iter()
395            .map(|idx| {
396                if idx < self.file_schema.fields().len() {
397                    self.file_schema.field(idx).clone()
398                } else {
399                    let partition_idx = idx - self.file_schema.fields().len();
400                    self.table_partition_cols[partition_idx].clone()
401                }
402            })
403            .collect();
404
405        Arc::new(Schema::new_with_metadata(
406            table_fields,
407            self.file_schema.metadata().clone(),
408        ))
409    }
410
411    fn projected_constraints(&self) -> Constraints {
412        let indexes = self.projection_indices();
413
414        self.constraints
415            .project(&indexes)
416            .unwrap_or_else(Constraints::empty)
417    }
418
419    /// Set the projection of the files
420    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
421        self.projection = projection;
422        self
423    }
424
425    /// Set the limit of the files
426    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
427        self.limit = limit;
428        self
429    }
430
431    /// Add a file as a single group
432    ///
433    /// See [Self::file_groups] for more information.
434    pub fn with_file(self, file: PartitionedFile) -> Self {
435        self.with_file_group(vec![file])
436    }
437
438    /// Add the file groups
439    ///
440    /// See [Self::file_groups] for more information.
441    pub fn with_file_groups(
442        mut self,
443        mut file_groups: Vec<Vec<PartitionedFile>>,
444    ) -> Self {
445        self.file_groups.append(&mut file_groups);
446        self
447    }
448
449    /// Add a new file group
450    ///
451    /// See [Self::file_groups] for more information
452    pub fn with_file_group(mut self, file_group: Vec<PartitionedFile>) -> Self {
453        self.file_groups.push(file_group);
454        self
455    }
456
457    /// Set the partitioning columns of the files
458    pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
459        self.table_partition_cols = table_partition_cols;
460        self
461    }
462
463    /// Set the output ordering of the files
464    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
465        self.output_ordering = output_ordering;
466        self
467    }
468
469    /// Set the file compression type
470    pub fn with_file_compression_type(
471        mut self,
472        file_compression_type: FileCompressionType,
473    ) -> Self {
474        self.file_compression_type = file_compression_type;
475        self
476    }
477
478    /// Set the new_lines_in_values property
479    pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
480        self.new_lines_in_values = new_lines_in_values;
481        self
482    }
483
484    /// Specifies whether newlines in (quoted) values are supported.
485    ///
486    /// Parsing newlines in quoted values may be affected by execution behaviour such as
487    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
488    /// parsed successfully, which may reduce performance.
489    ///
490    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
491    pub fn newlines_in_values(&self) -> bool {
492        self.new_lines_in_values
493    }
494
495    /// Project the schema, constraints, and the statistics on the given column indices
496    pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
497        if self.projection.is_none() && self.table_partition_cols.is_empty() {
498            return (
499                Arc::clone(&self.file_schema),
500                self.constraints.clone(),
501                self.statistics.clone(),
502                self.output_ordering.clone(),
503            );
504        }
505
506        let schema = self.projected_schema();
507        let constraints = self.projected_constraints();
508        let stats = self.projected_stats();
509
510        let output_ordering = get_projected_output_ordering(self, &schema);
511
512        (schema, constraints, stats, output_ordering)
513    }
514
515    #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
516    pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
517        self.projection.as_ref().map(|p| {
518            p.iter()
519                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
520                .map(|col_idx| self.file_schema.field(*col_idx).name())
521                .cloned()
522                .collect()
523        })
524    }
525
526    /// Projects only file schema, ignoring partition columns
527    pub fn projected_file_schema(&self) -> SchemaRef {
528        let fields = self.file_column_projection_indices().map(|indices| {
529            indices
530                .iter()
531                .map(|col_idx| self.file_schema.field(*col_idx))
532                .cloned()
533                .collect::<Vec<_>>()
534        });
535
536        fields.map_or_else(
537            || Arc::clone(&self.file_schema),
538            |f| {
539                Arc::new(Schema::new_with_metadata(
540                    f,
541                    self.file_schema.metadata.clone(),
542                ))
543            },
544        )
545    }
546
547    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
548        self.projection.as_ref().map(|p| {
549            p.iter()
550                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
551                .copied()
552                .collect()
553        })
554    }
555
556    /// Attempts to do a bin-packing on files into file groups, such that any two files
557    /// in a file group are ordered and non-overlapping with respect to their statistics.
558    /// It will produce the smallest number of file groups possible.
559    pub fn split_groups_by_statistics(
560        table_schema: &SchemaRef,
561        file_groups: &[Vec<PartitionedFile>],
562        sort_order: &LexOrdering,
563    ) -> Result<Vec<Vec<PartitionedFile>>> {
564        let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
565        // First Fit:
566        // * Choose the first file group that a file can be placed into.
567        // * If it fits into no existing file groups, create a new one.
568        //
569        // By sorting files by min values and then applying first-fit bin packing,
570        // we can produce the smallest number of file groups such that
571        // files within a group are in order and non-overlapping.
572        //
573        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
574        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
575
576        if flattened_files.is_empty() {
577            return Ok(vec![]);
578        }
579
580        let statistics = MinMaxStatistics::new_from_files(
581            sort_order,
582            table_schema,
583            None,
584            flattened_files.iter().copied(),
585        )
586        .map_err(|e| {
587            e.context("construct min/max statistics for split_groups_by_statistics")
588        })?;
589
590        let indices_sorted_by_min = statistics.min_values_sorted();
591        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
592
593        for (idx, min) in indices_sorted_by_min {
594            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
595                // If our file is non-overlapping and comes _after_ the last file,
596                // it fits in this file group.
597                min > statistics.max(
598                    *group
599                        .last()
600                        .expect("groups should be nonempty at construction"),
601                )
602            });
603            match file_group_to_insert {
604                Some(group) => group.push(idx),
605                None => file_groups_indices.push(vec![idx]),
606            }
607        }
608
609        // Assemble indices back into groups of PartitionedFiles
610        Ok(file_groups_indices
611            .into_iter()
612            .map(|file_group_indices| {
613                file_group_indices
614                    .into_iter()
615                    .map(|idx| flattened_files[idx].clone())
616                    .collect()
617            })
618            .collect())
619    }
620
621    // TODO: This function should be moved into DataSourceExec once FileScanConfig moved out of datafusion/core
622    /// Returns a new [`DataSourceExec`] to scan the files specified by this config
623    pub fn build(self) -> Arc<DataSourceExec> {
624        Arc::new(DataSourceExec::new(Arc::new(self)))
625    }
626
627    /// Write the data_type based on file_source
628    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
629        write!(f, ", file_type={}", self.file_source.file_type())?;
630        self.file_source.fmt_extra(t, f)
631    }
632
633    /// Returns the file_source
634    pub fn file_source(&self) -> &Arc<dyn FileSource> {
635        &self.file_source
636    }
637}
638
639impl Debug for FileScanConfig {
640    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
641        write!(f, "FileScanConfig {{")?;
642        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
643
644        write!(f, "statistics={:?}, ", self.statistics)?;
645
646        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
647        write!(f, "}}")
648    }
649}
650
651impl DisplayAs for FileScanConfig {
652    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
653        let (schema, _, _, orderings) = self.project();
654
655        write!(f, "file_groups=")?;
656        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
657
658        if !schema.fields().is_empty() {
659            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
660        }
661
662        if let Some(limit) = self.limit {
663            write!(f, ", limit={limit}")?;
664        }
665
666        display_orderings(f, &orderings)?;
667
668        if !self.constraints.is_empty() {
669            write!(f, ", {}", self.constraints)?;
670        }
671
672        Ok(())
673    }
674}
675
676/// A helper that projects partition columns into the file record batches.
677///
678/// One interesting trick is the usage of a cache for the key buffers of the partition column
679/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
680/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
681/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
682pub struct PartitionColumnProjector {
683    /// An Arrow buffer initialized to zeros that represents the key array of all partition
684    /// columns (partition columns are materialized by dictionary arrays with only one
685    /// value in the dictionary, thus all the keys are equal to zero).
686    key_buffer_cache: ZeroBufferGenerators,
687    /// Mapping between the indexes in the list of partition columns and the target
688    /// schema. Sorted by index in the target schema so that we can iterate on it to
689    /// insert the partition columns in the target record batch.
690    projected_partition_indexes: Vec<(usize, usize)>,
691    /// The schema of the table once the projection was applied.
692    projected_schema: SchemaRef,
693}
694
695impl PartitionColumnProjector {
696    // Create a projector to insert the partitioning columns into batches read from files
697    // - `projected_schema`: the target schema with both file and partitioning columns
698    // - `table_partition_cols`: all the partitioning column names
699    pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
700        let mut idx_map = HashMap::new();
701        for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
702            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
703                idx_map.insert(partition_idx, schema_idx);
704            }
705        }
706
707        let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
708        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
709
710        Self {
711            projected_partition_indexes,
712            key_buffer_cache: Default::default(),
713            projected_schema,
714        }
715    }
716
717    // Transform the batch read from the file by inserting the partitioning columns
718    // to the right positions as deduced from `projected_schema`
719    // - `file_batch`: batch read from the file, with internal projection applied
720    // - `partition_values`: the list of partition values, one for each partition column
721    pub fn project(
722        &mut self,
723        file_batch: RecordBatch,
724        partition_values: &[ScalarValue],
725    ) -> Result<RecordBatch> {
726        let expected_cols =
727            self.projected_schema.fields().len() - self.projected_partition_indexes.len();
728
729        if file_batch.columns().len() != expected_cols {
730            return exec_err!(
731                "Unexpected batch schema from file, expected {} cols but got {}",
732                expected_cols,
733                file_batch.columns().len()
734            );
735        }
736
737        let mut cols = file_batch.columns().to_vec();
738        for &(pidx, sidx) in &self.projected_partition_indexes {
739            let p_value =
740                partition_values
741                    .get(pidx)
742                    .ok_or(DataFusionError::Execution(
743                        "Invalid partitioning found on disk".to_string(),
744                    ))?;
745
746            let mut partition_value = Cow::Borrowed(p_value);
747
748            // check if user forgot to dict-encode the partition value
749            let field = self.projected_schema.field(sidx);
750            let expected_data_type = field.data_type();
751            let actual_data_type = partition_value.data_type();
752            if let DataType::Dictionary(key_type, _) = expected_data_type {
753                if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
754                    warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
755                    partition_value = Cow::Owned(ScalarValue::Dictionary(
756                        key_type.clone(),
757                        Box::new(partition_value.as_ref().clone()),
758                    ));
759                }
760            }
761
762            cols.insert(
763                sidx,
764                create_output_array(
765                    &mut self.key_buffer_cache,
766                    partition_value.as_ref(),
767                    file_batch.num_rows(),
768                )?,
769            )
770        }
771
772        RecordBatch::try_new_with_options(
773            Arc::clone(&self.projected_schema),
774            cols,
775            &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
776        )
777        .map_err(Into::into)
778    }
779}
780
781#[derive(Debug, Default)]
782struct ZeroBufferGenerators {
783    gen_i8: ZeroBufferGenerator<i8>,
784    gen_i16: ZeroBufferGenerator<i16>,
785    gen_i32: ZeroBufferGenerator<i32>,
786    gen_i64: ZeroBufferGenerator<i64>,
787    gen_u8: ZeroBufferGenerator<u8>,
788    gen_u16: ZeroBufferGenerator<u16>,
789    gen_u32: ZeroBufferGenerator<u32>,
790    gen_u64: ZeroBufferGenerator<u64>,
791}
792
793/// Generate a arrow [`Buffer`] that contains zero values.
794#[derive(Debug, Default)]
795struct ZeroBufferGenerator<T>
796where
797    T: ArrowNativeType,
798{
799    cache: Option<Buffer>,
800    _t: PhantomData<T>,
801}
802
803impl<T> ZeroBufferGenerator<T>
804where
805    T: ArrowNativeType,
806{
807    const SIZE: usize = size_of::<T>();
808
809    fn get_buffer(&mut self, n_vals: usize) -> Buffer {
810        match &mut self.cache {
811            Some(buf) if buf.len() >= n_vals * Self::SIZE => {
812                buf.slice_with_length(0, n_vals * Self::SIZE)
813            }
814            _ => {
815                let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
816                key_buffer_builder.advance(n_vals); // keys are all 0
817                self.cache.insert(key_buffer_builder.finish()).clone()
818            }
819        }
820    }
821}
822
823fn create_dict_array<T>(
824    buffer_gen: &mut ZeroBufferGenerator<T>,
825    dict_val: &ScalarValue,
826    len: usize,
827    data_type: DataType,
828) -> Result<ArrayRef>
829where
830    T: ArrowNativeType,
831{
832    let dict_vals = dict_val.to_array()?;
833
834    let sliced_key_buffer = buffer_gen.get_buffer(len);
835
836    // assemble pieces together
837    let mut builder = ArrayData::builder(data_type)
838        .len(len)
839        .add_buffer(sliced_key_buffer);
840    builder = builder.add_child_data(dict_vals.to_data());
841    Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
842        builder.build().unwrap(),
843    )))
844}
845
846fn create_output_array(
847    key_buffer_cache: &mut ZeroBufferGenerators,
848    val: &ScalarValue,
849    len: usize,
850) -> Result<ArrayRef> {
851    if let ScalarValue::Dictionary(key_type, dict_val) = &val {
852        match key_type.as_ref() {
853            DataType::Int8 => {
854                return create_dict_array(
855                    &mut key_buffer_cache.gen_i8,
856                    dict_val,
857                    len,
858                    val.data_type(),
859                );
860            }
861            DataType::Int16 => {
862                return create_dict_array(
863                    &mut key_buffer_cache.gen_i16,
864                    dict_val,
865                    len,
866                    val.data_type(),
867                );
868            }
869            DataType::Int32 => {
870                return create_dict_array(
871                    &mut key_buffer_cache.gen_i32,
872                    dict_val,
873                    len,
874                    val.data_type(),
875                );
876            }
877            DataType::Int64 => {
878                return create_dict_array(
879                    &mut key_buffer_cache.gen_i64,
880                    dict_val,
881                    len,
882                    val.data_type(),
883                );
884            }
885            DataType::UInt8 => {
886                return create_dict_array(
887                    &mut key_buffer_cache.gen_u8,
888                    dict_val,
889                    len,
890                    val.data_type(),
891                );
892            }
893            DataType::UInt16 => {
894                return create_dict_array(
895                    &mut key_buffer_cache.gen_u16,
896                    dict_val,
897                    len,
898                    val.data_type(),
899                );
900            }
901            DataType::UInt32 => {
902                return create_dict_array(
903                    &mut key_buffer_cache.gen_u32,
904                    dict_val,
905                    len,
906                    val.data_type(),
907                );
908            }
909            DataType::UInt64 => {
910                return create_dict_array(
911                    &mut key_buffer_cache.gen_u64,
912                    dict_val,
913                    len,
914                    val.data_type(),
915                );
916            }
917            _ => {}
918        }
919    }
920
921    val.to_array_of_size(len)
922}
923
924/// The various listing tables does not attempt to read all files
925/// concurrently, instead they will read files in sequence within a
926/// partition.  This is an important property as it allows plans to
927/// run against 1000s of files and not try to open them all
928/// concurrently.
929///
930/// However, it means if we assign more than one file to a partition
931/// the output sort order will not be preserved as illustrated in the
932/// following diagrams:
933///
934/// When only 1 file is assigned to each partition, each partition is
935/// correctly sorted on `(A, B, C)`
936///
937/// ```text
938///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
939///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
940///┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
941///  │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
942///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
943///  │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
944///┃                                          │                    │                     ┃
945///  │                   │ │                    │                    │                 │
946///┃                                          │                    │                     ┃
947///  │                   │ │                    │                    │                 │
948///┃                                          │                    │                     ┃
949///  │                   │ │                    │                    │                 │
950///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
951///     DataFusion           DataFusion           DataFusion           DataFusion
952///┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
953/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
954///
955///                                      DataSourceExec
956///```
957///
958/// However, when more than 1 file is assigned to each partition, each
959/// partition is NOT correctly sorted on `(A, B, C)`. Once the second
960/// file is scanned, the same values for A, B and C can be repeated in
961/// the same sorted stream
962///
963///```text
964///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
965///  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
966///┃   ┌───────────────┐     ┌──────────────┐ │
967///  │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
968///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
969///  │ └───────────────┘ │ │ └──────────────┘   ┃
970///┃   ┌───────────────┐     ┌──────────────┐ │
971///  │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
972///┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
973///  │ └───────────────┘ │ │ └──────────────┘   ┃
974///┃                                          │
975///  │                   │ │                    ┃
976///┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
977///     DataFusion           DataFusion         ┃
978///┃    Partition 1          Partition 2
979/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
980///
981///              DataSourceExec
982///```
983fn get_projected_output_ordering(
984    base_config: &FileScanConfig,
985    projected_schema: &SchemaRef,
986) -> Vec<LexOrdering> {
987    let mut all_orderings = vec![];
988    for output_ordering in &base_config.output_ordering {
989        let mut new_ordering = LexOrdering::default();
990        for PhysicalSortExpr { expr, options } in output_ordering.iter() {
991            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
992                let name = col.name();
993                if let Some((idx, _)) = projected_schema.column_with_name(name) {
994                    // Compute the new sort expression (with correct index) after projection:
995                    new_ordering.push(PhysicalSortExpr {
996                        expr: Arc::new(Column::new(name, idx)),
997                        options: *options,
998                    });
999                    continue;
1000                }
1001            }
1002            // Cannot find expression in the projected_schema, stop iterating
1003            // since rest of the orderings are violated
1004            break;
1005        }
1006
1007        // do not push empty entries
1008        // otherwise we may have `Some(vec![])` at the output ordering.
1009        if new_ordering.is_empty() {
1010            continue;
1011        }
1012
1013        // Check if any file groups are not sorted
1014        if base_config.file_groups.iter().any(|group| {
1015            if group.len() <= 1 {
1016                // File groups with <= 1 files are always sorted
1017                return false;
1018            }
1019
1020            let statistics = match MinMaxStatistics::new_from_files(
1021                &new_ordering,
1022                projected_schema,
1023                base_config.projection.as_deref(),
1024                group,
1025            ) {
1026                Ok(statistics) => statistics,
1027                Err(e) => {
1028                    log::trace!("Error fetching statistics for file group: {e}");
1029                    // we can't prove that it's ordered, so we have to reject it
1030                    return true;
1031                }
1032            };
1033
1034            !statistics.is_sorted()
1035        }) {
1036            debug!(
1037                "Skipping specified output ordering {:?}. \
1038                Some file groups couldn't be determined to be sorted: {:?}",
1039                base_config.output_ordering[0], base_config.file_groups
1040            );
1041            continue;
1042        }
1043
1044        all_orderings.push(new_ordering);
1045    }
1046    all_orderings
1047}
1048
1049/// Convert type to a type suitable for use as a `ListingTable`
1050/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1051/// a reasonable trade off between a reasonable number of partition
1052/// values and space efficiency.
1053///
1054/// This use this to specify types for partition columns. However
1055/// you MAY also choose not to dictionary-encode the data or to use a
1056/// different dictionary type.
1057///
1058/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1059pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1060    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1061}
1062
1063/// Convert a [`ScalarValue`] of partition columns to a type, as
1064/// described in the documentation of [`wrap_partition_type_in_dict`],
1065/// which can wrap the types.
1066pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1067    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072    use crate::{test_util::MockSource, tests::aggr_test_schema};
1073
1074    use super::*;
1075    use arrow::{
1076        array::{Int32Array, RecordBatch},
1077        compute::SortOptions,
1078    };
1079
1080    use datafusion_common::stats::Precision;
1081    use datafusion_common::{assert_batches_eq, DFSchema};
1082    use datafusion_expr::{execution_props::ExecutionProps, SortExpr};
1083    use datafusion_physical_expr::create_physical_expr;
1084    use std::collections::HashMap;
1085
1086    fn create_physical_sort_expr(
1087        e: &SortExpr,
1088        input_dfschema: &DFSchema,
1089        execution_props: &ExecutionProps,
1090    ) -> Result<PhysicalSortExpr> {
1091        let SortExpr {
1092            expr,
1093            asc,
1094            nulls_first,
1095        } = e;
1096        Ok(PhysicalSortExpr {
1097            expr: create_physical_expr(expr, input_dfschema, execution_props)?,
1098            options: SortOptions {
1099                descending: !asc,
1100                nulls_first: *nulls_first,
1101            },
1102        })
1103    }
1104
1105    /// Returns the column names on the schema
1106    pub fn columns(schema: &Schema) -> Vec<String> {
1107        schema.fields().iter().map(|f| f.name().clone()).collect()
1108    }
1109
1110    #[test]
1111    fn physical_plan_config_no_projection() {
1112        let file_schema = aggr_test_schema();
1113        let conf = config_for_projection(
1114            Arc::clone(&file_schema),
1115            None,
1116            Statistics::new_unknown(&file_schema),
1117            to_partition_cols(vec![(
1118                "date".to_owned(),
1119                wrap_partition_type_in_dict(DataType::Utf8),
1120            )]),
1121        );
1122
1123        let (proj_schema, _, proj_statistics, _) = conf.project();
1124        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1125        assert_eq!(
1126            proj_schema.field(file_schema.fields().len()).name(),
1127            "date",
1128            "partition columns are the last columns"
1129        );
1130        assert_eq!(
1131            proj_statistics.column_statistics.len(),
1132            file_schema.fields().len() + 1
1133        );
1134        // TODO implement tests for partition column statistics once implemented
1135
1136        let col_names = conf.projected_file_column_names();
1137        assert_eq!(col_names, None);
1138
1139        let col_indices = conf.file_column_projection_indices();
1140        assert_eq!(col_indices, None);
1141    }
1142
1143    #[test]
1144    fn physical_plan_config_no_projection_tab_cols_as_field() {
1145        let file_schema = aggr_test_schema();
1146
1147        // make a table_partition_col as a field
1148        let table_partition_col =
1149            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1150                .with_metadata(HashMap::from_iter(vec![(
1151                    "key_whatever".to_owned(),
1152                    "value_whatever".to_owned(),
1153                )]));
1154
1155        let conf = config_for_projection(
1156            Arc::clone(&file_schema),
1157            None,
1158            Statistics::new_unknown(&file_schema),
1159            vec![table_partition_col.clone()],
1160        );
1161
1162        // verify the proj_schema includes the last column and exactly the same the field it is defined
1163        let (proj_schema, _, _, _) = conf.project();
1164        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1165        assert_eq!(
1166            *proj_schema.field(file_schema.fields().len()),
1167            table_partition_col,
1168            "partition columns are the last columns and ust have all values defined in created field"
1169        );
1170    }
1171
1172    #[test]
1173    fn physical_plan_config_with_projection() {
1174        let file_schema = aggr_test_schema();
1175        let conf = config_for_projection(
1176            Arc::clone(&file_schema),
1177            Some(vec![file_schema.fields().len(), 0]),
1178            Statistics {
1179                num_rows: Precision::Inexact(10),
1180                // assign the column index to distinct_count to help assert
1181                // the source statistic after the projection
1182                column_statistics: (0..file_schema.fields().len())
1183                    .map(|i| ColumnStatistics {
1184                        distinct_count: Precision::Inexact(i),
1185                        ..Default::default()
1186                    })
1187                    .collect(),
1188                total_byte_size: Precision::Absent,
1189            },
1190            to_partition_cols(vec![(
1191                "date".to_owned(),
1192                wrap_partition_type_in_dict(DataType::Utf8),
1193            )]),
1194        );
1195
1196        let (proj_schema, _, proj_statistics, _) = conf.project();
1197        assert_eq!(
1198            columns(&proj_schema),
1199            vec!["date".to_owned(), "c1".to_owned()]
1200        );
1201        let proj_stat_cols = proj_statistics.column_statistics;
1202        assert_eq!(proj_stat_cols.len(), 2);
1203        // TODO implement tests for proj_stat_cols[0] once partition column
1204        // statistics are implemented
1205        assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
1206
1207        let col_names = conf.projected_file_column_names();
1208        assert_eq!(col_names, Some(vec!["c1".to_owned()]));
1209
1210        let col_indices = conf.file_column_projection_indices();
1211        assert_eq!(col_indices, Some(vec![0]));
1212    }
1213
1214    #[test]
1215    fn partition_column_projector() {
1216        let file_batch = build_table_i32(
1217            ("a", &vec![0, 1, 2]),
1218            ("b", &vec![-2, -1, 0]),
1219            ("c", &vec![10, 11, 12]),
1220        );
1221        let partition_cols = vec![
1222            (
1223                "year".to_owned(),
1224                wrap_partition_type_in_dict(DataType::Utf8),
1225            ),
1226            (
1227                "month".to_owned(),
1228                wrap_partition_type_in_dict(DataType::Utf8),
1229            ),
1230            (
1231                "day".to_owned(),
1232                wrap_partition_type_in_dict(DataType::Utf8),
1233            ),
1234        ];
1235        // create a projected schema
1236        let statistics = Statistics {
1237            num_rows: Precision::Inexact(3),
1238            total_byte_size: Precision::Absent,
1239            column_statistics: Statistics::unknown_column(&file_batch.schema()),
1240        };
1241
1242        let conf = config_for_projection(
1243            file_batch.schema(),
1244            // keep all cols from file and 2 from partitioning
1245            Some(vec![
1246                0,
1247                1,
1248                2,
1249                file_batch.schema().fields().len(),
1250                file_batch.schema().fields().len() + 2,
1251            ]),
1252            statistics.clone(),
1253            to_partition_cols(partition_cols.clone()),
1254        );
1255
1256        let source_statistics = conf.file_source.statistics().unwrap();
1257        let conf_stats = conf.statistics().unwrap();
1258
1259        // projection should be reflected in the file source statistics
1260        assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
1261
1262        // 3 original statistics + 2 partition statistics
1263        assert_eq!(conf_stats.column_statistics.len(), 5);
1264
1265        // file statics should not be modified
1266        assert_eq!(source_statistics, statistics);
1267        assert_eq!(source_statistics.column_statistics.len(), 3);
1268
1269        let (proj_schema, ..) = conf.project();
1270        // created a projector for that projected schema
1271        let mut proj = PartitionColumnProjector::new(
1272            proj_schema,
1273            &partition_cols
1274                .iter()
1275                .map(|x| x.0.clone())
1276                .collect::<Vec<_>>(),
1277        );
1278
1279        // project first batch
1280        let projected_batch = proj
1281            .project(
1282                // file_batch is ok here because we kept all the file cols in the projection
1283                file_batch,
1284                &[
1285                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1286                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1287                    wrap_partition_value_in_dict(ScalarValue::from("26")),
1288                ],
1289            )
1290            .expect("Projection of partition columns into record batch failed");
1291        let expected = [
1292            "+---+----+----+------+-----+",
1293            "| a | b  | c  | year | day |",
1294            "+---+----+----+------+-----+",
1295            "| 0 | -2 | 10 | 2021 | 26  |",
1296            "| 1 | -1 | 11 | 2021 | 26  |",
1297            "| 2 | 0  | 12 | 2021 | 26  |",
1298            "+---+----+----+------+-----+",
1299        ];
1300        assert_batches_eq!(expected, &[projected_batch]);
1301
1302        // project another batch that is larger than the previous one
1303        let file_batch = build_table_i32(
1304            ("a", &vec![5, 6, 7, 8, 9]),
1305            ("b", &vec![-10, -9, -8, -7, -6]),
1306            ("c", &vec![12, 13, 14, 15, 16]),
1307        );
1308        let projected_batch = proj
1309            .project(
1310                // file_batch is ok here because we kept all the file cols in the projection
1311                file_batch,
1312                &[
1313                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1314                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1315                    wrap_partition_value_in_dict(ScalarValue::from("27")),
1316                ],
1317            )
1318            .expect("Projection of partition columns into record batch failed");
1319        let expected = [
1320            "+---+-----+----+------+-----+",
1321            "| a | b   | c  | year | day |",
1322            "+---+-----+----+------+-----+",
1323            "| 5 | -10 | 12 | 2021 | 27  |",
1324            "| 6 | -9  | 13 | 2021 | 27  |",
1325            "| 7 | -8  | 14 | 2021 | 27  |",
1326            "| 8 | -7  | 15 | 2021 | 27  |",
1327            "| 9 | -6  | 16 | 2021 | 27  |",
1328            "+---+-----+----+------+-----+",
1329        ];
1330        assert_batches_eq!(expected, &[projected_batch]);
1331
1332        // project another batch that is smaller than the previous one
1333        let file_batch = build_table_i32(
1334            ("a", &vec![0, 1, 3]),
1335            ("b", &vec![2, 3, 4]),
1336            ("c", &vec![4, 5, 6]),
1337        );
1338        let projected_batch = proj
1339            .project(
1340                // file_batch is ok here because we kept all the file cols in the projection
1341                file_batch,
1342                &[
1343                    wrap_partition_value_in_dict(ScalarValue::from("2021")),
1344                    wrap_partition_value_in_dict(ScalarValue::from("10")),
1345                    wrap_partition_value_in_dict(ScalarValue::from("28")),
1346                ],
1347            )
1348            .expect("Projection of partition columns into record batch failed");
1349        let expected = [
1350            "+---+---+---+------+-----+",
1351            "| a | b | c | year | day |",
1352            "+---+---+---+------+-----+",
1353            "| 0 | 2 | 4 | 2021 | 28  |",
1354            "| 1 | 3 | 5 | 2021 | 28  |",
1355            "| 3 | 4 | 6 | 2021 | 28  |",
1356            "+---+---+---+------+-----+",
1357        ];
1358        assert_batches_eq!(expected, &[projected_batch]);
1359
1360        // forgot to dictionary-wrap the scalar value
1361        let file_batch = build_table_i32(
1362            ("a", &vec![0, 1, 2]),
1363            ("b", &vec![-2, -1, 0]),
1364            ("c", &vec![10, 11, 12]),
1365        );
1366        let projected_batch = proj
1367            .project(
1368                // file_batch is ok here because we kept all the file cols in the projection
1369                file_batch,
1370                &[
1371                    ScalarValue::from("2021"),
1372                    ScalarValue::from("10"),
1373                    ScalarValue::from("26"),
1374                ],
1375            )
1376            .expect("Projection of partition columns into record batch failed");
1377        let expected = [
1378            "+---+----+----+------+-----+",
1379            "| a | b  | c  | year | day |",
1380            "+---+----+----+------+-----+",
1381            "| 0 | -2 | 10 | 2021 | 26  |",
1382            "| 1 | -1 | 11 | 2021 | 26  |",
1383            "| 2 | 0  | 12 | 2021 | 26  |",
1384            "+---+----+----+------+-----+",
1385        ];
1386        assert_batches_eq!(expected, &[projected_batch]);
1387    }
1388
1389    #[test]
1390    fn test_projected_file_schema_with_partition_col() {
1391        let schema = aggr_test_schema();
1392        let partition_cols = vec![
1393            (
1394                "part1".to_owned(),
1395                wrap_partition_type_in_dict(DataType::Utf8),
1396            ),
1397            (
1398                "part2".to_owned(),
1399                wrap_partition_type_in_dict(DataType::Utf8),
1400            ),
1401        ];
1402
1403        // Projected file schema for config with projection including partition column
1404        let projection = config_for_projection(
1405            schema.clone(),
1406            Some(vec![0, 3, 5, schema.fields().len()]),
1407            Statistics::new_unknown(&schema),
1408            to_partition_cols(partition_cols),
1409        )
1410        .projected_file_schema();
1411
1412        // Assert partition column filtered out in projected file schema
1413        let expected_columns = vec!["c1", "c4", "c6"];
1414        let actual_columns = projection
1415            .fields()
1416            .iter()
1417            .map(|f| f.name().clone())
1418            .collect::<Vec<_>>();
1419        assert_eq!(expected_columns, actual_columns);
1420    }
1421
1422    #[test]
1423    fn test_projected_file_schema_without_projection() {
1424        let schema = aggr_test_schema();
1425        let partition_cols = vec![
1426            (
1427                "part1".to_owned(),
1428                wrap_partition_type_in_dict(DataType::Utf8),
1429            ),
1430            (
1431                "part2".to_owned(),
1432                wrap_partition_type_in_dict(DataType::Utf8),
1433            ),
1434        ];
1435
1436        // Projected file schema for config without projection
1437        let projection = config_for_projection(
1438            schema.clone(),
1439            None,
1440            Statistics::new_unknown(&schema),
1441            to_partition_cols(partition_cols),
1442        )
1443        .projected_file_schema();
1444
1445        // Assert projected file schema is equal to file schema
1446        assert_eq!(projection.fields(), schema.fields());
1447    }
1448
1449    #[test]
1450    fn test_split_groups_by_statistics() -> Result<()> {
1451        use chrono::TimeZone;
1452        use datafusion_common::DFSchema;
1453        use datafusion_expr::execution_props::ExecutionProps;
1454        use object_store::{path::Path, ObjectMeta};
1455
1456        struct File {
1457            name: &'static str,
1458            date: &'static str,
1459            statistics: Vec<Option<(f64, f64)>>,
1460        }
1461        impl File {
1462            fn new(
1463                name: &'static str,
1464                date: &'static str,
1465                statistics: Vec<Option<(f64, f64)>>,
1466            ) -> Self {
1467                Self {
1468                    name,
1469                    date,
1470                    statistics,
1471                }
1472            }
1473        }
1474
1475        struct TestCase {
1476            name: &'static str,
1477            file_schema: Schema,
1478            files: Vec<File>,
1479            sort: Vec<SortExpr>,
1480            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1481        }
1482
1483        use datafusion_expr::col;
1484        let cases = vec![
1485            TestCase {
1486                name: "test sort",
1487                file_schema: Schema::new(vec![Field::new(
1488                    "value".to_string(),
1489                    DataType::Float64,
1490                    false,
1491                )]),
1492                files: vec![
1493                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1494                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1495                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1496                ],
1497                sort: vec![col("value").sort(true, false)],
1498                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1499            },
1500            // same input but file '2' is in the middle
1501            // test that we still order correctly
1502            TestCase {
1503                name: "test sort with files ordered differently",
1504                file_schema: Schema::new(vec![Field::new(
1505                    "value".to_string(),
1506                    DataType::Float64,
1507                    false,
1508                )]),
1509                files: vec![
1510                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1511                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1512                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1513                ],
1514                sort: vec![col("value").sort(true, false)],
1515                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1516            },
1517            TestCase {
1518                name: "reverse sort",
1519                file_schema: Schema::new(vec![Field::new(
1520                    "value".to_string(),
1521                    DataType::Float64,
1522                    false,
1523                )]),
1524                files: vec![
1525                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1526                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1527                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1528                ],
1529                sort: vec![col("value").sort(false, true)],
1530                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1531            },
1532            // reject nullable sort columns
1533            TestCase {
1534                name: "no nullable sort columns",
1535                file_schema: Schema::new(vec![Field::new(
1536                    "value".to_string(),
1537                    DataType::Float64,
1538                    true, // should fail because nullable
1539                )]),
1540                files: vec![
1541                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1542                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1543                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1544                ],
1545                sort: vec![col("value").sort(true, false)],
1546                expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column")
1547            },
1548            TestCase {
1549                name: "all three non-overlapping",
1550                file_schema: Schema::new(vec![Field::new(
1551                    "value".to_string(),
1552                    DataType::Float64,
1553                    false,
1554                )]),
1555                files: vec![
1556                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1557                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1558                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1559                ],
1560                sort: vec![col("value").sort(true, false)],
1561                expected_result: Ok(vec![vec!["0", "1", "2"]]),
1562            },
1563            TestCase {
1564                name: "all three overlapping",
1565                file_schema: Schema::new(vec![Field::new(
1566                    "value".to_string(),
1567                    DataType::Float64,
1568                    false,
1569                )]),
1570                files: vec![
1571                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1572                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1573                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1574                ],
1575                sort: vec![col("value").sort(true, false)],
1576                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1577            },
1578            TestCase {
1579                name: "empty input",
1580                file_schema: Schema::new(vec![Field::new(
1581                    "value".to_string(),
1582                    DataType::Float64,
1583                    false,
1584                )]),
1585                files: vec![],
1586                sort: vec![col("value").sort(true, false)],
1587                expected_result: Ok(vec![]),
1588            },
1589            TestCase {
1590                name: "one file missing statistics",
1591                file_schema: Schema::new(vec![Field::new(
1592                    "value".to_string(),
1593                    DataType::Float64,
1594                    false,
1595                )]),
1596                files: vec![
1597                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1598                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1599                    File::new("2", "2023-01-02", vec![None]),
1600                ],
1601                sort: vec![col("value").sort(true, false)],
1602                expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
1603            },
1604        ];
1605
1606        for case in cases {
1607            let table_schema = Arc::new(Schema::new(
1608                case.file_schema
1609                    .fields()
1610                    .clone()
1611                    .into_iter()
1612                    .cloned()
1613                    .chain(Some(Arc::new(Field::new(
1614                        "date".to_string(),
1615                        DataType::Utf8,
1616                        false,
1617                    ))))
1618                    .collect::<Vec<_>>(),
1619            ));
1620            let sort_order = LexOrdering::from(
1621                case.sort
1622                    .into_iter()
1623                    .map(|expr| {
1624                        create_physical_sort_expr(
1625                            &expr,
1626                            &DFSchema::try_from(table_schema.as_ref().clone())?,
1627                            &ExecutionProps::default(),
1628                        )
1629                    })
1630                    .collect::<Result<Vec<_>>>()?,
1631            );
1632
1633            let partitioned_files =
1634                case.files.into_iter().map(From::from).collect::<Vec<_>>();
1635            let result = FileScanConfig::split_groups_by_statistics(
1636                &table_schema,
1637                &[partitioned_files.clone()],
1638                &sort_order,
1639            );
1640            let results_by_name = result
1641                .as_ref()
1642                .map(|file_groups| {
1643                    file_groups
1644                        .iter()
1645                        .map(|file_group| {
1646                            file_group
1647                                .iter()
1648                                .map(|file| {
1649                                    partitioned_files
1650                                        .iter()
1651                                        .find_map(|f| {
1652                                            if f.object_meta == file.object_meta {
1653                                                Some(
1654                                                    f.object_meta
1655                                                        .location
1656                                                        .as_ref()
1657                                                        .rsplit('/')
1658                                                        .next()
1659                                                        .unwrap()
1660                                                        .trim_end_matches(".parquet"),
1661                                                )
1662                                            } else {
1663                                                None
1664                                            }
1665                                        })
1666                                        .unwrap()
1667                                })
1668                                .collect::<Vec<_>>()
1669                        })
1670                        .collect::<Vec<_>>()
1671                })
1672                .map_err(|e| e.strip_backtrace().leak() as &'static str);
1673
1674            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1675        }
1676
1677        return Ok(());
1678
1679        impl From<File> for PartitionedFile {
1680            fn from(file: File) -> Self {
1681                PartitionedFile {
1682                    object_meta: ObjectMeta {
1683                        location: Path::from(format!(
1684                            "data/date={}/{}.parquet",
1685                            file.date, file.name
1686                        )),
1687                        last_modified: chrono::Utc.timestamp_nanos(0),
1688                        size: 0,
1689                        e_tag: None,
1690                        version: None,
1691                    },
1692                    partition_values: vec![ScalarValue::from(file.date)],
1693                    range: None,
1694                    statistics: Some(Statistics {
1695                        num_rows: Precision::Absent,
1696                        total_byte_size: Precision::Absent,
1697                        column_statistics: file
1698                            .statistics
1699                            .into_iter()
1700                            .map(|stats| {
1701                                stats
1702                                    .map(|(min, max)| ColumnStatistics {
1703                                        min_value: Precision::Exact(ScalarValue::from(
1704                                            min,
1705                                        )),
1706                                        max_value: Precision::Exact(ScalarValue::from(
1707                                            max,
1708                                        )),
1709                                        ..Default::default()
1710                                    })
1711                                    .unwrap_or_default()
1712                            })
1713                            .collect::<Vec<_>>(),
1714                    }),
1715                    extensions: None,
1716                    metadata_size_hint: None,
1717                }
1718            }
1719        }
1720    }
1721
1722    // sets default for configs that play no role in projections
1723    fn config_for_projection(
1724        file_schema: SchemaRef,
1725        projection: Option<Vec<usize>>,
1726        statistics: Statistics,
1727        table_partition_cols: Vec<Field>,
1728    ) -> FileScanConfig {
1729        FileScanConfig::new(
1730            ObjectStoreUrl::parse("test:///").unwrap(),
1731            file_schema,
1732            Arc::new(MockSource::default()),
1733        )
1734        .with_projection(projection)
1735        .with_statistics(statistics)
1736        .with_table_partition_cols(table_partition_cols)
1737    }
1738
1739    /// Convert partition columns from Vec<String DataType> to Vec<Field>
1740    fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
1741        table_partition_cols
1742            .iter()
1743            .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
1744            .collect::<Vec<_>>()
1745    }
1746
1747    /// returns record batch with 3 columns of i32 in memory
1748    pub fn build_table_i32(
1749        a: (&str, &Vec<i32>),
1750        b: (&str, &Vec<i32>),
1751        c: (&str, &Vec<i32>),
1752    ) -> RecordBatch {
1753        let schema = Schema::new(vec![
1754            Field::new(a.0, DataType::Int32, false),
1755            Field::new(b.0, DataType::Int32, false),
1756            Field::new(c.0, DataType::Int32, false),
1757        ]);
1758
1759        RecordBatch::try_new(
1760            Arc::new(schema),
1761            vec![
1762                Arc::new(Int32Array::from(a.1.clone())),
1763                Arc::new(Int32Array::from(b.1.clone())),
1764                Arc::new(Int32Array::from(c.1.clone())),
1765            ],
1766        )
1767        .unwrap()
1768    }
1769}